diff --git a/VERSION.txt b/VERSION.txt index 0c63546b804..259865a698a 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -414,6 +414,8 @@ jetty-10.0.18 - 26 October 2023 + 10537 HTTP/3: Incomplete Data Transfer When Used with Spring Boot WebFlux + 10696 jetty.sh doesn't work with JETTY_USER in Jetty 10.0.17 thru Jetty 12.0.2 + + 10669 Provide ability to defer initial deployment of webapps until after + Server has started + 10705 Creating a `HTTP3ServerConnector` with a `SslContextFactory` that has a non-null `SSLContext` makes the server fail to start with an unclear error message diff --git a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java index fad96cac277..d406ee1d3bd 100644 --- a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java +++ b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java @@ -347,17 +347,10 @@ public class ContentDocs } @Override - public void succeeded() + protected void onSuccess() { // After every successful write, release the chunk. chunk.release(); - super.succeeded(); - } - - @Override - public void failed(Throwable x) - { - super.failed(x); } @Override diff --git a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java index fad96cac277..d406ee1d3bd 100644 --- a/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java +++ b/documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java @@ -347,17 +347,10 @@ public class ContentDocs } @Override - public void succeeded() + protected void onSuccess() { // After every successful write, release the chunk. chunk.release(); - super.succeeded(); - } - - @Override - public void failed(Throwable x) - { - super.failed(x); } @Override diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java index c60607572a7..551a210ae0e 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java @@ -543,7 +543,7 @@ public abstract class HttpSender } @Override - public void succeeded() + protected void onSuccess() { boolean proceed = true; if (committed) @@ -588,8 +588,6 @@ public abstract class HttpSender // There was some concurrent error, terminate. complete = true; } - - super.succeeded(); } @Override diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java index 8fff51d6454..1f6bc4a3f75 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpSenderOverHTTP.java @@ -235,17 +235,9 @@ public class HttpSenderOverHTTP extends HttpSender } @Override - public void succeeded() + protected void onSuccess() { release(); - super.succeeded(); - } - - @Override - public void failed(Throwable x) - { - release(); - super.failed(x); } @Override @@ -259,6 +251,7 @@ public class HttpSenderOverHTTP extends HttpSender protected void onCompleteFailure(Throwable cause) { super.onCompleteFailure(cause); + release(); callback.failed(cause); } diff --git a/jetty-core/jetty-deploy/src/main/config/etc/jetty-core-deploy.xml b/jetty-core/jetty-deploy/src/main/config/etc/jetty-core-deploy.xml index bb55b9aa561..551fcc99b9a 100644 --- a/jetty-core/jetty-deploy/src/main/config/etc/jetty-core-deploy.xml +++ b/jetty-core/jetty-deploy/src/main/config/etc/jetty-core-deploy.xml @@ -1,11 +1,7 @@ - - - - - + @@ -20,7 +16,7 @@ - + core diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java index 0093517653a..9c64ffb7794 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java @@ -101,12 +101,11 @@ public class Flusher } @Override - public void succeeded() + protected void onSuccess() { if (active != null) active.succeeded(); active = null; - super.succeeded(); } @Override diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java index 73e95969371..1d3849f066f 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java @@ -295,7 +295,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable } @Override - public void succeeded() + protected void onSuccess() { if (LOG.isDebugEnabled()) LOG.debug("Written - entries processed/pending {}/{}: {}/{}", @@ -304,7 +304,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable processedEntries, pendingEntries); finish(); - super.succeeded(); } private void finish() diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java index 439b34fdf4f..59827ebe195 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java @@ -513,17 +513,15 @@ public class RawHTTP2ProxyTest } @Override - public void succeeded() + protected void onSuccess() { frameInfo.callback.succeeded(); - super.succeeded(); } @Override - public void failed(Throwable failure) + protected void onCompleteFailure(Throwable cause) { - frameInfo.callback.failed(failure); - super.failed(failure); + frameInfo.callback.failed(cause); } @Override @@ -669,17 +667,15 @@ public class RawHTTP2ProxyTest } @Override - public void succeeded() + protected void onSuccess() { frameInfo.callback.succeeded(); - super.succeeded(); } @Override - public void failed(Throwable failure) + protected void onCompleteFailure(Throwable cause) { - frameInfo.callback.failed(failure); - super.failed(failure); + frameInfo.callback.failed(cause); } private void offer(Stream stream, Frame frame, Callback callback) diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java index 3e4aa1ca195..deee66c03b7 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/ControlFlusher.java @@ -108,7 +108,7 @@ public class ControlFlusher extends IteratingCallback } @Override - public void succeeded() + protected void onSuccess() { if (LOG.isDebugEnabled()) LOG.debug("succeeded to write {} on {}", entries, this); @@ -119,8 +119,6 @@ public class ControlFlusher extends IteratingCallback entries.clear(); invocationType = InvocationType.NON_BLOCKING; - - super.succeeded(); } @Override diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java index 79bca5470c3..34422362062 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java @@ -102,6 +102,15 @@ public class InstructionFlusher extends IteratingCallback return Action.SCHEDULED; } + @Override + protected void onSuccess() + { + if (LOG.isDebugEnabled()) + LOG.debug("succeeded to write {} buffers on {}", accumulator.getByteBuffers().size(), this); + + accumulator.reset(); + } + @Override protected void onCompleteSuccess() { diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java index 59427685526..f450bfaf0ec 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/MessageFlusher.java @@ -75,7 +75,7 @@ public class MessageFlusher extends IteratingCallback return Action.SCHEDULED; } - int generated = generator.generate(accumulator, entry.endPoint.getStreamId(), frame, this::failed); + int generated = generator.generate(accumulator, entry.endPoint.getStreamId(), frame, this::onGenerateFailure); if (generated < 0) return Action.SCHEDULED; @@ -88,33 +88,48 @@ public class MessageFlusher extends IteratingCallback return Action.SCHEDULED; } + private void onGenerateFailure(Throwable cause) + { + if (LOG.isDebugEnabled()) + LOG.debug("failed to generate {} on {}", entry, this, cause); + + accumulator.release(); + + entry.callback.failed(cause); + entry = null; + + // Continue the iteration. + succeeded(); + } + @Override - public void succeeded() + protected void onSuccess() { if (LOG.isDebugEnabled()) LOG.debug("succeeded to write {} on {}", entry, this); accumulator.release(); - entry.callback.succeeded(); - entry = null; - - super.succeeded(); + if (entry != null) + { + entry.callback.succeeded(); + entry = null; + } } @Override - public void failed(Throwable x) + protected void onCompleteFailure(Throwable cause) { if (LOG.isDebugEnabled()) - LOG.debug("failed to write {} on {}", entry, this, x); + LOG.debug("failed to write {} on {}", entry, this, cause); accumulator.release(); - entry.callback.failed(x); - entry = null; - - // Continue the iteration. - super.succeeded(); + if (entry != null) + { + entry.callback.failed(cause); + entry = null; + } } @Override diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index c1cf7c49066..01cbe9a3515 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -20,13 +20,13 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; -import java.util.function.Consumer; import java.util.function.IntUnaryOperator; import java.util.stream.Collectors; @@ -205,24 +205,49 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable // No bucket, return non-pooled. if (bucket == null) - return newRetainableByteBuffer(size, direct, null); + return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct)); bucket.recordAcquire(); // Try to acquire a pooled entry. Pool.Entry entry = bucket.getPool().acquire(); - if (entry != null) + if (entry == null) { - bucket.recordPooled(); - RetainableByteBuffer.Pooled buffer = entry.getPooled(); - ((PooledBuffer)buffer).acquire(); - return buffer; + ByteBuffer buffer = BufferUtil.allocate(bucket.getCapacity(), direct); + return new ReservedBuffer(buffer, bucket); } - return newRetainableByteBuffer(bucket.getCapacity(), direct, buffer -> reserve(bucket, buffer)); + bucket.recordPooled(); + RetainableByteBuffer.Pooled buffer = entry.getPooled(); + ((PooledBuffer)buffer).acquire(); + return buffer; } - private void reserve(RetainedBucket bucket, RetainableByteBuffer.Pooled buffer) + @Override + public boolean removeAndRelease(RetainableByteBuffer buffer) + { + RetainableByteBuffer actual = buffer; + while (actual instanceof RetainableByteBuffer.Wrapper wrapper) + actual = wrapper.getWrapped(); + + if (actual instanceof ReservedBuffer reservedBuffer) + { + // remove the actual reserved buffer, but release the wrapped buffer + reservedBuffer.remove(); + return buffer.release(); + } + + if (actual instanceof PooledBuffer poolBuffer) + { + // remove the actual pool buffer, but release the wrapped buffer + poolBuffer.remove(); + return buffer.release(); + } + + return ByteBufferPool.super.removeAndRelease(buffer); + } + + private void reserve(RetainedBucket bucket, ByteBuffer byteBuffer) { bucket.recordRelease(); @@ -235,12 +260,11 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable } // Add the buffer to the new entry. - ByteBuffer byteBuffer = buffer.getByteBuffer(); BufferUtil.reset(byteBuffer); - PooledBuffer pooledBuffer = new PooledBuffer(this, byteBuffer, b -> release(bucket, entry)); + PooledBuffer pooledBuffer = new PooledBuffer(byteBuffer, bucket, entry); if (entry.enable(pooledBuffer, false)) { - checkMaxMemory(bucket, buffer.isDirect()); + checkMaxMemory(bucket, byteBuffer.isDirect()); return; } @@ -270,6 +294,13 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable entry.remove(); } + private boolean remove(RetainedBucket bucket, Pool.Entry entry) + { + // Cannot release, discard this buffer. + bucket.recordRemove(); + return entry.remove(); + } + private void checkMaxMemory(RetainedBucket bucket, boolean direct) { long max = direct ? _maxDirectMemory : _maxHeapMemory; @@ -309,14 +340,6 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable } } - private RetainableByteBuffer.Pooled newRetainableByteBuffer(int capacity, boolean direct, Consumer releaser) - { - ByteBuffer buffer = BufferUtil.allocate(capacity, direct); - PooledBuffer retainableByteBuffer = new PooledBuffer(this, buffer, releaser); - retainableByteBuffer.acquire(); - return retainableByteBuffer; - } - public Pool poolFor(int capacity, boolean direct) { RetainedBucket bucket = bucketFor(capacity, direct); @@ -581,20 +604,49 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable } } - private static class PooledBuffer extends RetainableByteBuffer.Pooled + private class ReservedBuffer extends RetainableByteBuffer.Pooled + { + private final RetainedBucket _bucket; + private final AtomicBoolean _removed = new AtomicBoolean(); + + private ReservedBuffer(ByteBuffer buffer, RetainedBucket bucket) + { + super(ArrayByteBufferPool.this, buffer); + _bucket = Objects.requireNonNull(bucket); + } + + @Override + public boolean release() + { + boolean released = super.release(); + if (released && _removed.compareAndSet(false, true)) + reserve(_bucket, getByteBuffer()); + return released; + } + + void remove() + { + // Buffer never added to pool, so just prevent future reservation + _removed.compareAndSet(false, true); + } + } + + private class PooledBuffer extends RetainableByteBuffer.Pooled { - private final Consumer _releaser; private final ReferenceCounter _referenceCounter; + private final RetainedBucket _bucket; + private final Pool.Entry _entry; private int _usages; - private PooledBuffer(ByteBufferPool pool, ByteBuffer buffer, Consumer releaser) + private PooledBuffer(ByteBuffer buffer, RetainedBucket bucket, Pool.Entry entry) { - super(pool, buffer, new ReferenceCounter(0)); + super(ArrayByteBufferPool.this, buffer, new ReferenceCounter(0)); if (getWrapped() instanceof ReferenceCounter referenceCounter) _referenceCounter = referenceCounter; else throw new IllegalArgumentException(); - this._releaser = releaser; + _bucket = Objects.requireNonNull(bucket); + _entry = Objects.requireNonNull(entry); } @Override @@ -602,13 +654,15 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable { boolean released = super.release(); if (released) - { - if (_releaser != null) - _releaser.accept(this); - } + ArrayByteBufferPool.this.release(_bucket, _entry); return released; } + void remove() + { + ArrayByteBufferPool.this.remove(_bucket, _entry); + } + private int use() { if (++_usages < 0) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 079aec92128..f1b77e4d9c0 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -58,6 +58,20 @@ public interface ByteBufferPool */ RetainableByteBuffer.Mutable acquire(int size, boolean direct); + /** + * {@link RetainableByteBuffer#release() Release} the buffer in a way that will remove it from any pool that it may be in. + * If the buffer is not in a pool, calling this method is equivalent to calling {@link RetainableByteBuffer#release()}. + * Calling this method satisfies any contract that requires a call to {@link RetainableByteBuffer#release()}. + * @return {@code true} if a call to {@link RetainableByteBuffer#release()} would have returned {@code true}. + * @see RetainableByteBuffer#release() + * @deprecated This API is experimental and may be removed in future releases + */ + @Deprecated + default boolean removeAndRelease(RetainableByteBuffer buffer) + { + return buffer != null && buffer.release(); + } + /** *

Removes all {@link RetainableByteBuffer#isRetained() non-retained} * pooled instances from this pool.

@@ -161,7 +175,7 @@ public interface ByteBufferPool @Override public RetainableByteBuffer.Mutable acquire(int size, boolean direct) { - return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct)).asMutable(); + return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct)); } @Override diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java index 85be1ea59fa..f570d2dd3f2 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java @@ -92,7 +92,7 @@ public interface RetainableByteBuffer extends Retainable * @return a {@link FixedCapacity} buffer wrapping the passed {@link ByteBuffer} * @see ByteBufferPool#NON_POOLING */ - static RetainableByteBuffer wrap(ByteBuffer byteBuffer) + static RetainableByteBuffer.Mutable wrap(ByteBuffer byteBuffer) { return new FixedCapacity(byteBuffer); } @@ -106,7 +106,7 @@ public interface RetainableByteBuffer extends Retainable * @return a {@link FixedCapacity} buffer wrapping the passed {@link ByteBuffer} * @see ByteBufferPool#NON_POOLING */ - static RetainableByteBuffer wrap(ByteBuffer byteBuffer, Retainable retainable) + static RetainableByteBuffer.Mutable wrap(ByteBuffer byteBuffer, Retainable retainable) { return new FixedCapacity(byteBuffer, retainable); } @@ -119,7 +119,7 @@ public interface RetainableByteBuffer extends Retainable * @param releaser a {@link Runnable} to call when the buffer is released. * @return a {@link FixedCapacity} buffer wrapping the passed {@link ByteBuffer} */ - static RetainableByteBuffer wrap(ByteBuffer byteBuffer, Runnable releaser) + static RetainableByteBuffer.Mutable wrap(ByteBuffer byteBuffer, Runnable releaser) { return new FixedCapacity(byteBuffer) { @@ -2009,7 +2009,7 @@ public interface RetainableByteBuffer extends Retainable byteBuffer.limit(limit + Math.toIntExact(space)); byteBuffer = byteBuffer.slice(); byteBuffer.limit(limit); - _aggregate = RetainableByteBuffer.wrap(byteBuffer, _aggregate).asMutable(); + _aggregate = RetainableByteBuffer.wrap(byteBuffer, _aggregate); } } diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java index 44e0a3110d8..25e424f47bc 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java @@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -445,4 +446,43 @@ public class ArrayByteBufferPoolTest assertThat(compoundPool.getPrimaryPool().size(), is(ConcurrentPool.OPTIMAL_MAX_SIZE)); assertThat(compoundPool.getSecondaryPool().size(), is(0)); } + + @Test + public void testRemoveAndRelease() + { + ArrayByteBufferPool pool = new ArrayByteBufferPool(); + + RetainableByteBuffer reserved0 = pool.acquire(1024, false); + RetainableByteBuffer reserved1 = pool.acquire(1024, false); + + RetainableByteBuffer acquired0 = pool.acquire(1024, false); + acquired0.release(); + acquired0 = pool.acquire(1024, false); + RetainableByteBuffer acquired1 = pool.acquire(1024, false); + acquired1.release(); + acquired1 = pool.acquire(1024, false); + + RetainableByteBuffer retained0 = pool.acquire(1024, false); + retained0.release(); + retained0 = pool.acquire(1024, false); + retained0.retain(); + RetainableByteBuffer retained1 = pool.acquire(1024, false); + retained1.release(); + retained1 = pool.acquire(1024, false); + retained1.retain(); + + assertTrue(pool.removeAndRelease(reserved1)); + assertTrue(pool.removeAndRelease(acquired1)); + assertFalse(pool.removeAndRelease(retained1)); + assertTrue(retained1.release()); + + assertThat(pool.getHeapByteBufferCount(), is(2L)); + assertTrue(reserved0.release()); + assertThat(pool.getHeapByteBufferCount(), is(3L)); + assertTrue(acquired0.release()); + assertThat(pool.getHeapByteBufferCount(), is(3L)); + assertFalse(retained0.release()); + assertTrue(retained0.release()); + assertThat(pool.getHeapByteBufferCount(), is(3L)); + } } diff --git a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java index c5ff003db1b..e5c1d340b65 100644 --- a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java +++ b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java @@ -17,11 +17,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayDeque; import java.util.Collection; import java.util.EventListener; import java.util.List; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -39,7 +40,6 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.component.LifeCycle; -import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; @@ -344,26 +344,19 @@ public abstract class QuicConnection extends AbstractConnection private class Flusher extends IteratingCallback { - private final AutoLock lock = new AutoLock(); - private final ArrayDeque queue = new ArrayDeque<>(); + private final Queue queue = new ConcurrentLinkedQueue<>(); private Entry entry; public void offer(Callback callback, SocketAddress address, ByteBuffer[] buffers) { - try (AutoLock l = lock.lock()) - { - queue.offer(new Entry(callback, address, buffers)); - } + queue.offer(new Entry(callback, address, buffers)); iterate(); } @Override protected Action process() { - try (AutoLock l = lock.lock()) - { - entry = queue.poll(); - } + entry = queue.poll(); if (entry == null) return Action.IDLE; @@ -372,17 +365,9 @@ public abstract class QuicConnection extends AbstractConnection } @Override - public void succeeded() + protected void onSuccess() { entry.callback.succeeded(); - super.succeeded(); - } - - @Override - public void failed(Throwable x) - { - entry.callback.failed(x); - super.failed(x); } @Override @@ -394,10 +379,11 @@ public abstract class QuicConnection extends AbstractConnection @Override protected void onCompleteFailure(Throwable cause) { + entry.callback.failed(cause); QuicConnection.this.close(); } - private class Entry + private static class Entry { private final Callback callback; private final SocketAddress address; diff --git a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index 8f998fa8743..75ee111e659 100644 --- a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -521,12 +521,11 @@ public abstract class QuicSession extends ContainerLifeCycle } @Override - public void succeeded() + protected void onSuccess() { if (LOG.isDebugEnabled()) LOG.debug("written cipher bytes on {}", QuicSession.this); cipherBuffer.release(); - super.succeeded(); } @Override diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java index 76082dbba5f..51fdb7e1011 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java @@ -760,17 +760,11 @@ public class ConnectHandler extends Handler.Wrapper } @Override - public void succeeded() + protected void onSuccess() { if (LOG.isDebugEnabled()) LOG.debug("Wrote {} bytes {}", filled, TunnelConnection.this); buffer.release(); - super.succeeded(); - } - - @Override - protected void onCompleteSuccess() - { } @Override diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java index ee0432b3116..d32e4255c39 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.eclipse.jetty.http.HttpException; @@ -39,14 +40,15 @@ import org.eclipse.jetty.http.MimeTypes.Type; import org.eclipse.jetty.http.PreEncodedHttpField; import org.eclipse.jetty.http.QuotedQualityCSV; import org.eclipse.jetty.io.ByteBufferOutputStream; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; -import org.eclipse.jetty.io.Retainable; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.Attributes; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -198,7 +200,8 @@ public class ErrorHandler implements Request.Handler int bufferSize = request.getConnectionMetaData().getHttpConfiguration().getOutputBufferSize(); bufferSize = Math.min(8192, bufferSize); // TODO ? - RetainableByteBuffer buffer = request.getComponents().getByteBufferPool().acquire(bufferSize, false); + ByteBufferPool byteBufferPool = request.getComponents().getByteBufferPool(); + RetainableByteBuffer buffer = byteBufferPool.acquire(bufferSize, false); try { @@ -251,13 +254,14 @@ public class ErrorHandler implements Request.Handler } response.getHeaders().put(type.getContentTypeField(charset)); - response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, buffer)); + response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, byteBufferPool, buffer)); return true; } catch (Throwable x) { - buffer.release(); + if (buffer != null) + byteBufferPool.removeAndRelease(buffer); throw x; } } @@ -579,20 +583,33 @@ public class ErrorHandler implements Request.Handler * when calling {@link Response#write(boolean, ByteBuffer, Callback)} to wrap the passed in {@link Callback} * so that the {@link RetainableByteBuffer} used can be released. */ - private static class WriteErrorCallback extends Callback.Nested + private static class WriteErrorCallback implements Callback { - private final Retainable _retainable; + private final AtomicReference _callback; + private final ByteBufferPool _pool; + private final RetainableByteBuffer _buffer; - public WriteErrorCallback(Callback callback, Retainable retainable) + public WriteErrorCallback(Callback callback, ByteBufferPool pool, RetainableByteBuffer retainable) { - super(callback); - _retainable = retainable; + _callback = new AtomicReference<>(callback); + _pool = pool; + _buffer = retainable; } @Override - public void completed() + public void succeeded() { - _retainable.release(); + Callback callback = _callback.getAndSet(null); + if (callback != null) + ExceptionUtil.callAndThen(_buffer::release, callback::succeeded); + } + + @Override + public void failed(Throwable x) + { + Callback callback = _callback.getAndSet(null); + if (callback != null) + ExceptionUtil.callAndThen(x, t -> _pool.removeAndRelease(_buffer), callback::failed); } } } diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java index ed8e2b9c483..a3a7ecf9a46 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/CustomTransportTest.java @@ -237,7 +237,8 @@ public class CustomTransportTest channels.put(channel.id, channel); // Register for read interest with the EndPoint. - endPoint.fillInterested(new EndPointToChannelCallback(channel)); + EndPointToChannelCallback endPointToChannelCallback = new EndPointToChannelCallback(channel); + endPoint.fillInterested(Callback.from(endPointToChannelCallback::iterate)); } // Called when there data to read from the Gateway on the given Channel. @@ -322,18 +323,10 @@ public class CustomTransportTest endPoint.fillInterested(this); return Action.IDLE; } - channel.write(this, buffer); + channel.write(Callback.from(this::iterate), buffer); return Action.SCHEDULED; } - @Override - public void succeeded() - { - // There is data to read from the EndPoint. - // Iterate to read it and send it to the Gateway. - iterate(); - } - @Override protected void onCompleteSuccess() { diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index 9ada006fab6..155577f58e2 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -167,6 +167,18 @@ public abstract class IteratingCallback implements Callback */ protected abstract Action process() throws Throwable; + /** + * Invoked when one task has completed successfully, either by the + * caller thread or by the processing thread. This invocation is + * always serialized w.r.t the execution of {@link #process()}. + *

+ * This method is not invoked when a call to {@link #abort(Throwable)} + * is made before the {@link #succeeded()} callback happens. + */ + protected void onSuccess() + { + } + /** * Invoked when the overall task has completed successfully. * @@ -255,6 +267,7 @@ public abstract class IteratingCallback implements Callback // Fall through to possibly invoke onCompleteFailure(). } + boolean callOnSuccess = false; // acted on the action we have just received try (AutoLock ignored = _lock.lock()) { @@ -305,6 +318,7 @@ public abstract class IteratingCallback implements Callback case CALLED: { + callOnSuccess = true; if (action != Action.SCHEDULED) throw new IllegalStateException(String.format("%s[action=%s]", this, action)); // we lost the race, so we have to keep processing @@ -327,6 +341,11 @@ public abstract class IteratingCallback implements Callback throw new IllegalStateException(String.format("%s[action=%s]", this, action)); } } + finally + { + if (callOnSuccess) + onSuccess(); + } } if (notifyCompleteSuccess) @@ -338,8 +357,11 @@ public abstract class IteratingCallback implements Callback /** * Method to invoke when the asynchronous sub-task succeeds. *

- * Subclasses that override this method must always remember - * to call {@code super.succeeded()}. + * This method should be considered final for all practical purposes. + *

+ * Eventually, {@link #onSuccess()} is + * called, either by the caller thread or by the processing + * thread. */ @Override public void succeeded() @@ -374,7 +396,10 @@ public abstract class IteratingCallback implements Callback } } if (process) + { + onSuccess(); processing(); + } } /** @@ -382,8 +407,7 @@ public abstract class IteratingCallback implements Callback * or to fail the overall asynchronous task and therefore * terminate the iteration. *

- * Subclasses that override this method must always remember - * to call {@code super.failed(Throwable)}. + * This method should be considered final for all practical purposes. *

* Eventually, {@link #onCompleteFailure(Throwable)} is * called, either by the caller thread or by the processing diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java index b1e738a27dd..00f6b007b7d 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/IteratingCallbackTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class IteratingCallbackTest @@ -435,4 +436,28 @@ public class IteratingCallbackTest assertEquals(1, count.get()); } + + @Test + public void testOnSuccessCalledDespiteISE() throws Exception + { + CountDownLatch latch = new CountDownLatch(1); + IteratingCallback icb = new IteratingCallback() + { + @Override + protected Action process() + { + succeeded(); + return Action.IDLE; // illegal action + } + + @Override + protected void onSuccess() + { + latch.countDown(); + } + }; + + assertThrows(IllegalStateException.class, icb::iterate); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } } diff --git a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java index 3a4558e8b1d..11e048ac823 100644 --- a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java +++ b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/AsyncProxyServlet.java @@ -192,10 +192,9 @@ public class AsyncProxyServlet extends ProxyServlet } @Override - public void failed(Throwable x) + protected void onCompleteFailure(Throwable cause) { - super.failed(x); - onError(x); + onError(cause); } } diff --git a/jetty-ee10/jetty-ee10-quickstart/src/main/config/etc/jetty-ee10-quickstart.xml b/jetty-ee10/jetty-ee10-quickstart/src/main/config/etc/jetty-ee10-quickstart.xml index 14788ab785c..a08f0c808c3 100644 --- a/jetty-ee10/jetty-ee10-quickstart/src/main/config/etc/jetty-ee10-quickstart.xml +++ b/jetty-ee10/jetty-ee10-quickstart/src/main/config/etc/jetty-ee10-quickstart.xml @@ -7,7 +7,7 @@ - + diff --git a/jetty-ee10/jetty-ee10-runner/pom.xml b/jetty-ee10/jetty-ee10-runner/pom.xml index 2f723239f05..7a5bb272298 100644 --- a/jetty-ee10/jetty-ee10-runner/pom.xml +++ b/jetty-ee10/jetty-ee10-runner/pom.xml @@ -132,6 +132,9 @@ ${it.debug} true + + org.eclipse.jetty:jetty-client:${project.version} + org.eclipse.jetty.maven.its.ee10.runner test diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java index 62ead4fe53e..498079c1c04 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java @@ -135,6 +135,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private long _written; private long _flushed; private long _firstByteNanoTime = -1; + private ByteBufferPool _pool; private RetainableByteBuffer _aggregate; private int _bufferSize; private int _commitSize; @@ -222,7 +223,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _state = State.CLOSED; closedCallback = _closedCallback; _closedCallback = null; - releaseBuffer(); + lockedReleaseBuffer(failure != null); wake = updateApiState(failure); } else if (_state == State.CLOSE) @@ -444,7 +445,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable try (AutoLock ignored = _channelState.lock()) { _state = State.CLOSED; - releaseBuffer(); + lockedReleaseBuffer(failure != null); } } @@ -576,25 +577,36 @@ public class HttpOutput extends ServletOutputStream implements Runnable { try (AutoLock ignored = _channelState.lock()) { - return acquireBuffer().getByteBuffer(); + return lockedAcquireBuffer().getByteBuffer(); } } - private RetainableByteBuffer acquireBuffer() + private RetainableByteBuffer lockedAcquireBuffer() { + assert _channelState.isLockHeldByCurrentThread(); + boolean useOutputDirectByteBuffers = _servletChannel.getConnectionMetaData().getHttpConfiguration().isUseOutputDirectByteBuffers(); - ByteBufferPool pool = _servletChannel.getRequest().getComponents().getByteBufferPool(); + if (_aggregate == null) - _aggregate = pool.acquire(getBufferSize(), useOutputDirectByteBuffers); + { + _pool = _servletChannel.getRequest().getComponents().getByteBufferPool(); + _aggregate = _pool.acquire(getBufferSize(), useOutputDirectByteBuffers); + } return _aggregate; } - private void releaseBuffer() + private void lockedReleaseBuffer(boolean failure) { + assert _channelState.isLockHeldByCurrentThread(); + if (_aggregate != null) { - _aggregate.release(); + if (failure && _pool != null) + _pool.removeAndRelease(_aggregate); + else + _aggregate.release(); _aggregate = null; + _pool = null; } } @@ -752,7 +764,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable // Should we aggregate? if (aggregate) { - acquireBuffer(); + lockedAcquireBuffer(); int filled = BufferUtil.fill(_aggregate.getByteBuffer(), b, off, len); // return if we are not complete, not full and filled all the content @@ -957,7 +969,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable } _written = written; - acquireBuffer(); + lockedAcquireBuffer(); BufferUtil.append(_aggregate.getByteBuffer(), (byte)b); } @@ -1260,6 +1272,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable { try (AutoLock ignored = _channelState.lock()) { + lockedReleaseBuffer(_state != State.CLOSED); _state = State.OPEN; _apiState = ApiState.BLOCKING; _softClose = true; // Stay closed until next request @@ -1268,7 +1281,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable _commitSize = config.getOutputAggregationSize(); if (_commitSize > _bufferSize) _commitSize = _bufferSize; - releaseBuffer(); _written = 0; _writeListener = null; _onError = null; diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java index 0c419cba559..da225815373 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java @@ -227,6 +227,11 @@ public class ServletChannelState return _lock.lock(); } + boolean isLockHeldByCurrentThread() + { + return _lock.isHeldByCurrentThread(); + } + public State getState() { try (AutoLock ignored = lock()) diff --git a/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ContextScopeListenerTest.java b/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ContextScopeListenerTest.java index fff43243749..4aca923ba8d 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ContextScopeListenerTest.java +++ b/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ContextScopeListenerTest.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import jakarta.servlet.AsyncContext; import jakarta.servlet.DispatcherType; @@ -73,7 +72,7 @@ public class ContextScopeListenerTest } @Test - public void testAsyncServlet() throws Exception + public void testAsyncServletAsyncBeforeDoGetExit() throws Exception { _contextHandler.addServlet(new ServletHolder(new HttpServlet() { @@ -87,12 +86,25 @@ public class ContextScopeListenerTest } _history.add("doGet"); + CountDownLatch latch = new CountDownLatch(1); AsyncContext asyncContext = req.startAsync(); asyncContext.start(() -> { _history.add("asyncRunnable"); asyncContext.dispatch("/dispatch"); + latch.countDown(); + // wait until doGet call has exited + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> _history.get(_history.size() - 1).equals("exitScope /initialPath")); }); + + try + { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } }), "/"); @@ -100,13 +112,9 @@ public class ContextScopeListenerTest _contextHandler.addEventListener(new ContextHandler.ContextScopeListener() { - // Use a lock to prevent the async thread running the listener concurrently. - private final ReentrantLock _lock = new ReentrantLock(); - @Override public void enterScope(Context context, Request request) { - _lock.lock(); String pathInContext = (request == null) ? "null" : Request.getPathInContext(request); _history.add("enterScope " + pathInContext); } @@ -129,9 +137,81 @@ public class ContextScopeListenerTest assertHistory( "enterScope /initialPath", "doGet", - "exitScope /initialPath", "enterScope /initialPath", "asyncRunnable", + "asyncDispatch", + "exitScope /initialPath", + "exitScope /initialPath" + ); + } + + @Test + public void testAsyncServletAsyncAfterDoGetExit() throws Exception + { + _contextHandler.addServlet(new ServletHolder(new HttpServlet() + { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + { + if (req.getDispatcherType() == DispatcherType.ASYNC) + { + _history.add("asyncDispatch"); + return; + } + + _history.add("doGet"); + CountDownLatch latch = new CountDownLatch(1); + AsyncContext asyncContext = req.startAsync(); + asyncContext.start(() -> + { + latch.countDown(); + // wait until doGet call has exited + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> _history.get(_history.size() - 1).equals("exitScope /initialPath")); + + _history.add("asyncRunnable"); + asyncContext.dispatch("/dispatch"); + }); + + try + { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + + } + }), "/"); + + _contextHandler.addEventListener(new ContextHandler.ContextScopeListener() + { + @Override + public void enterScope(Context context, Request request) + { + String pathInContext = (request == null) ? "null" : Request.getPathInContext(request); + _history.add("enterScope " + pathInContext); + } + + @Override + public void exitScope(Context context, Request request) + { + String pathInContext = (request == null) ? "null" : Request.getPathInContext(request); + _history.add("exitScope " + pathInContext); + } + }); + + URI uri = URI.create("http://localhost:" + _connector.getLocalPort() + "/initialPath"); + ContentResponse response = _client.GET(uri); + assertThat(response.getStatus(), equalTo(HttpStatus.OK_200)); + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> _history.size() == 9); + assertHistory( + "enterScope /initialPath", + "doGet", + "enterScope /initialPath", + "exitScope /initialPath", + "asyncRunnable", "exitScope /initialPath", "enterScope /initialPath", "asyncDispatch", diff --git a/jetty-ee10/jetty-ee10-webapp/src/main/config/etc/jetty-ee10-deploy.xml b/jetty-ee10/jetty-ee10-webapp/src/main/config/etc/jetty-ee10-deploy.xml index 53872c8445b..868a0e51462 100644 --- a/jetty-ee10/jetty-ee10-webapp/src/main/config/etc/jetty-ee10-deploy.xml +++ b/jetty-ee10/jetty-ee10-webapp/src/main/config/etc/jetty-ee10-deploy.xml @@ -1,11 +1,7 @@ - - - - - + @@ -20,7 +16,7 @@ - + ee10 @@ -41,6 +37,7 @@ + diff --git a/jetty-ee8/jetty-ee8-quickstart/src/main/config/etc/jetty-ee8-quickstart.xml b/jetty-ee8/jetty-ee8-quickstart/src/main/config/etc/jetty-ee8-quickstart.xml index 1dd6e943f55..4a36d28e556 100644 --- a/jetty-ee8/jetty-ee8-quickstart/src/main/config/etc/jetty-ee8-quickstart.xml +++ b/jetty-ee8/jetty-ee8-quickstart/src/main/config/etc/jetty-ee8-quickstart.xml @@ -7,7 +7,7 @@ - + diff --git a/jetty-ee8/jetty-ee8-runner/pom.xml b/jetty-ee8/jetty-ee8-runner/pom.xml index 0e721307e77..facf5d0ff69 100644 --- a/jetty-ee8/jetty-ee8-runner/pom.xml +++ b/jetty-ee8/jetty-ee8-runner/pom.xml @@ -136,6 +136,9 @@ ${it.debug} true + + org.eclipse.jetty:jetty-client:${project.version} + org.eclipse.jetty.maven.its.ee8.runner test diff --git a/jetty-ee8/jetty-ee8-webapp/src/main/config/etc/jetty-ee8-deploy.xml b/jetty-ee8/jetty-ee8-webapp/src/main/config/etc/jetty-ee8-deploy.xml index 101c003028d..52fb1adae48 100644 --- a/jetty-ee8/jetty-ee8-webapp/src/main/config/etc/jetty-ee8-deploy.xml +++ b/jetty-ee8/jetty-ee8-webapp/src/main/config/etc/jetty-ee8-deploy.xml @@ -1,11 +1,7 @@ - - - - - + @@ -20,7 +16,7 @@ - + ee8 diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java index 7fdc4184b06..b05626a4424 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java @@ -158,6 +158,11 @@ public class HttpChannelState return _lock.lock(); } + boolean isLockHeldByCurrentThread() + { + return _lock.isHeldByCurrentThread(); + } + public State getState() { try (AutoLock l = lock()) diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java index e00db387d35..f85fba69161 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java @@ -32,6 +32,7 @@ import jakarta.servlet.ServletRequest; import jakarta.servlet.ServletResponse; import jakarta.servlet.WriteListener; import org.eclipse.jetty.http.content.HttpContent; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.IOResources; @@ -198,6 +199,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private long _written; private long _flushed; private long _firstByteNanoTime = -1; + private ByteBufferPool _pool; private RetainableByteBuffer _aggregate; private int _bufferSize; private int _commitSize; @@ -299,7 +301,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable _state = State.CLOSED; closedCallback = _closedCallback; _closedCallback = null; - releaseBuffer(); + lockedReleaseBuffer(failure != null); wake = updateApiState(failure); } else if (_state == State.CLOSE) @@ -520,7 +522,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable try (AutoLock l = _channelState.lock()) { _state = State.CLOSED; - releaseBuffer(); + lockedReleaseBuffer(failure != null); } } @@ -649,23 +651,34 @@ public class HttpOutput extends ServletOutputStream implements Runnable { try (AutoLock l = _channelState.lock()) { - return acquireBuffer().getByteBuffer(); + return lockedAcquireBuffer().getByteBuffer(); } } - private RetainableByteBuffer acquireBuffer() + private RetainableByteBuffer lockedAcquireBuffer() { + assert _channelState.isLockHeldByCurrentThread(); + if (_aggregate == null) - _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.isUseOutputDirectByteBuffers()); + { + _pool = _channel.getByteBufferPool(); + _aggregate = _pool.acquire(getBufferSize(), _channel.isUseOutputDirectByteBuffers()); + } return _aggregate; } - private void releaseBuffer() + private void lockedReleaseBuffer(boolean failure) { + assert _channelState.isLockHeldByCurrentThread(); + if (_aggregate != null) { - _aggregate.release(); + if (failure && _pool != null) + _pool.removeAndRelease(_aggregate); + else + _aggregate.release(); _aggregate = null; + _pool = null; } } @@ -828,7 +841,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable // Should we aggregate? if (aggregate) { - acquireBuffer(); + lockedAcquireBuffer(); int filled = BufferUtil.fill(_aggregate.getByteBuffer(), b, off, len); // return if we are not complete, not full and filled all the content @@ -1033,7 +1046,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable } _written = written; - acquireBuffer(); + lockedAcquireBuffer(); BufferUtil.append(_aggregate.getByteBuffer(), (byte)b); } @@ -1438,6 +1451,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable { try (AutoLock l = _channelState.lock()) { + lockedReleaseBuffer(_state != State.CLOSED); _state = State.OPEN; _apiState = ApiState.BLOCKING; _softClose = true; // Stay closed until next request @@ -1447,7 +1461,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable _commitSize = config.getOutputAggregationSize(); if (_commitSize > _bufferSize) _commitSize = _bufferSize; - releaseBuffer(); _written = 0; _writeListener = null; _onError = null; diff --git a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java index 9c9f87e8255..321da55fd15 100644 --- a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java +++ b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/AsyncProxyServlet.java @@ -192,10 +192,9 @@ public class AsyncProxyServlet extends ProxyServlet } @Override - public void failed(Throwable x) + protected void onCompleteFailure(Throwable cause) { - super.failed(x); - onError(x); + onError(cause); } } diff --git a/jetty-ee9/jetty-ee9-quickstart/src/main/config/etc/jetty-ee9-quickstart.xml b/jetty-ee9/jetty-ee9-quickstart/src/main/config/etc/jetty-ee9-quickstart.xml index 91606ec0ea5..e47a22440b1 100644 --- a/jetty-ee9/jetty-ee9-quickstart/src/main/config/etc/jetty-ee9-quickstart.xml +++ b/jetty-ee9/jetty-ee9-quickstart/src/main/config/etc/jetty-ee9-quickstart.xml @@ -7,7 +7,7 @@ - + diff --git a/jetty-ee9/jetty-ee9-runner/pom.xml b/jetty-ee9/jetty-ee9-runner/pom.xml index 18d4d870051..6b81966404a 100644 --- a/jetty-ee9/jetty-ee9-runner/pom.xml +++ b/jetty-ee9/jetty-ee9-runner/pom.xml @@ -134,6 +134,9 @@ ${it.debug} true + + org.eclipse.jetty:jetty-client:${project.version} + org.eclipse.jetty.maven.its.ee9.runner test diff --git a/jetty-ee9/jetty-ee9-servlet/src/test/java/org/eclipse/jetty/ee9/servlet/ContextScopeListenerTest.java b/jetty-ee9/jetty-ee9-servlet/src/test/java/org/eclipse/jetty/ee9/servlet/ContextScopeListenerTest.java index 3ae532c74bf..765995e980c 100644 --- a/jetty-ee9/jetty-ee9-servlet/src/test/java/org/eclipse/jetty/ee9/servlet/ContextScopeListenerTest.java +++ b/jetty-ee9/jetty-ee9-servlet/src/test/java/org/eclipse/jetty/ee9/servlet/ContextScopeListenerTest.java @@ -17,7 +17,8 @@ import java.net.URI; import java.util.Arrays; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import jakarta.servlet.AsyncContext; import jakarta.servlet.DispatcherType; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ContextScopeListenerTest { @@ -83,24 +85,31 @@ public class ContextScopeListenerTest } _history.add("doGet"); + CountDownLatch latch = new CountDownLatch(1); AsyncContext asyncContext = req.startAsync(); asyncContext.start(() -> { _history.add("asyncRunnable"); asyncContext.dispatch("/dispatch"); + latch.countDown(); }); + + try + { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } }), "/"); _contextHandler.addEventListener(new ContextHandler.ContextScopeListener() { - // Use a lock to prevent the async thread running the listener concurrently. - private final ReentrantLock _lock = new ReentrantLock(); - @Override public void enterScope(ContextHandler.APIContext context, org.eclipse.jetty.ee9.nested.Request request, Object reason) { - _lock.lock(); String pathInContext = (request == null) ? "null" : URIUtil.addPaths(request.getServletPath(), request.getPathInfo()); _history.add("enterScope " + pathInContext); } @@ -110,7 +119,6 @@ public class ContextScopeListenerTest { String pathInContext = (request == null) ? "null" : URIUtil.addPaths(request.getServletPath(), request.getPathInfo()); _history.add("exitScope " + pathInContext); - _lock.unlock(); } }); @@ -120,14 +128,10 @@ public class ContextScopeListenerTest assertHistory( "enterScope /initialPath", "doGet", - "exitScope /initialPath", "enterScope /initialPath", "asyncRunnable", "exitScope /initialPath", - "enterScope /dispatch", "asyncDispatch", - "exitScope /dispatch", - "enterScope /dispatch", "exitScope /dispatch" ); } diff --git a/jetty-ee9/jetty-ee9-webapp/src/main/config/etc/jetty-ee9-deploy.xml b/jetty-ee9/jetty-ee9-webapp/src/main/config/etc/jetty-ee9-deploy.xml index a0be5ab89f4..d4f60b484b1 100644 --- a/jetty-ee9/jetty-ee9-webapp/src/main/config/etc/jetty-ee9-deploy.xml +++ b/jetty-ee9/jetty-ee9-webapp/src/main/config/etc/jetty-ee9-deploy.xml @@ -1,11 +1,7 @@ - - - - - + @@ -20,7 +16,7 @@ - + ee9 @@ -41,6 +37,7 @@ +