Merge remote-tracking branch 'origin/jetty-12.0.x' into jetty-12.1.x
# Conflicts: # jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionFlusher.java # jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java # jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java # jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ContextScopeListenerTest.java # jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java
This commit is contained in:
commit
5903efdf96
|
@ -414,6 +414,8 @@ jetty-10.0.18 - 26 October 2023
|
|||
+ 10537 HTTP/3: Incomplete Data Transfer When Used with Spring Boot WebFlux
|
||||
+ 10696 jetty.sh doesn't work with JETTY_USER in Jetty 10.0.17 thru Jetty
|
||||
12.0.2
|
||||
+ 10669 Provide ability to defer initial deployment of webapps until after
|
||||
Server has started
|
||||
+ 10705 Creating a `HTTP3ServerConnector` with a `SslContextFactory` that has
|
||||
a non-null `SSLContext` makes the server fail to start with an unclear error
|
||||
message
|
||||
|
|
|
@ -347,17 +347,10 @@ public class ContentDocs
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
// After every successful write, release the chunk.
|
||||
chunk.release();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
super.failed(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -347,17 +347,10 @@ public class ContentDocs
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
// After every successful write, release the chunk.
|
||||
chunk.release();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
super.failed(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -543,7 +543,7 @@ public abstract class HttpSender
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
boolean proceed = true;
|
||||
if (committed)
|
||||
|
@ -588,8 +588,6 @@ public abstract class HttpSender
|
|||
// There was some concurrent error, terminate.
|
||||
complete = true;
|
||||
}
|
||||
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -235,17 +235,9 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
release();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
release();
|
||||
super.failed(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -259,6 +251,7 @@ public class HttpSenderOverHTTP extends HttpSender
|
|||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
super.onCompleteFailure(cause);
|
||||
release();
|
||||
callback.failed(cause);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,7 @@
|
|||
<?xml version="1.0"?><!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "https://eclipse.dev/jetty/configure_10_0.dtd">
|
||||
|
||||
<!-- =============================================================== -->
|
||||
<!-- Create the deployment manager -->
|
||||
<!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
|
||||
<!-- The deplyment manager handles the lifecycle of deploying web -->
|
||||
<!-- applications. Apps are provided by instances of the -->
|
||||
<!-- AppProvider interface. -->
|
||||
<!-- Attach the "core" environment app deployment provider -->
|
||||
<!-- =============================================================== -->
|
||||
<Configure id="Server" class="org.eclipse.jetty.server.Server">
|
||||
<Call id="DeploymentManager" name="getBean">
|
||||
|
@ -20,7 +16,7 @@
|
|||
<Ref refid="DeploymentManager">
|
||||
<Call name="addAppProvider">
|
||||
<Arg>
|
||||
<New id="WebAppProvider" class="org.eclipse.jetty.deploy.providers.ContextProvider">
|
||||
<New id="WebAppProvider-core" class="org.eclipse.jetty.deploy.providers.ContextProvider">
|
||||
<Set name="EnvironmentName">core</Set>
|
||||
<Set name="monitoredDirName">
|
||||
<Call name="resolvePath" class="org.eclipse.jetty.xml.XmlConfiguration">
|
||||
|
|
|
@ -101,12 +101,11 @@ public class Flusher
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
if (active != null)
|
||||
active.succeeded();
|
||||
active = null;
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -295,7 +295,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Written - entries processed/pending {}/{}: {}/{}",
|
||||
|
@ -304,7 +304,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
processedEntries,
|
||||
pendingEntries);
|
||||
finish();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
private void finish()
|
||||
|
|
|
@ -513,17 +513,15 @@ public class RawHTTP2ProxyTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
frameInfo.callback.succeeded();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable failure)
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
frameInfo.callback.failed(failure);
|
||||
super.failed(failure);
|
||||
frameInfo.callback.failed(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -669,17 +667,15 @@ public class RawHTTP2ProxyTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
frameInfo.callback.succeeded();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable failure)
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
frameInfo.callback.failed(failure);
|
||||
super.failed(failure);
|
||||
frameInfo.callback.failed(cause);
|
||||
}
|
||||
|
||||
private void offer(Stream stream, Frame frame, Callback callback)
|
||||
|
|
|
@ -108,7 +108,7 @@ public class ControlFlusher extends IteratingCallback
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("succeeded to write {} on {}", entries, this);
|
||||
|
@ -119,8 +119,6 @@ public class ControlFlusher extends IteratingCallback
|
|||
entries.clear();
|
||||
|
||||
invocationType = InvocationType.NON_BLOCKING;
|
||||
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -102,6 +102,15 @@ public class InstructionFlusher extends IteratingCallback
|
|||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onSuccess()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("succeeded to write {} buffers on {}", accumulator.getByteBuffers().size(), this);
|
||||
|
||||
accumulator.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
|
|
|
@ -75,7 +75,7 @@ public class MessageFlusher extends IteratingCallback
|
|||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
int generated = generator.generate(accumulator, entry.endPoint.getStreamId(), frame, this::failed);
|
||||
int generated = generator.generate(accumulator, entry.endPoint.getStreamId(), frame, this::onGenerateFailure);
|
||||
if (generated < 0)
|
||||
return Action.SCHEDULED;
|
||||
|
||||
|
@ -88,33 +88,48 @@ public class MessageFlusher extends IteratingCallback
|
|||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
private void onGenerateFailure(Throwable cause)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failed to generate {} on {}", entry, this, cause);
|
||||
|
||||
accumulator.release();
|
||||
|
||||
entry.callback.failed(cause);
|
||||
entry = null;
|
||||
|
||||
// Continue the iteration.
|
||||
succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("succeeded to write {} on {}", entry, this);
|
||||
|
||||
accumulator.release();
|
||||
|
||||
entry.callback.succeeded();
|
||||
entry = null;
|
||||
|
||||
super.succeeded();
|
||||
if (entry != null)
|
||||
{
|
||||
entry.callback.succeeded();
|
||||
entry = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failed to write {} on {}", entry, this, x);
|
||||
LOG.debug("failed to write {} on {}", entry, this, cause);
|
||||
|
||||
accumulator.release();
|
||||
|
||||
entry.callback.failed(x);
|
||||
entry = null;
|
||||
|
||||
// Continue the iteration.
|
||||
super.succeeded();
|
||||
if (entry != null)
|
||||
{
|
||||
entry.callback.failed(cause);
|
||||
entry = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,13 +20,13 @@ import java.nio.ByteBuffer;
|
|||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.IntUnaryOperator;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -205,24 +205,49 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
|
|||
|
||||
// No bucket, return non-pooled.
|
||||
if (bucket == null)
|
||||
return newRetainableByteBuffer(size, direct, null);
|
||||
return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct));
|
||||
|
||||
bucket.recordAcquire();
|
||||
|
||||
// Try to acquire a pooled entry.
|
||||
Pool.Entry<RetainableByteBuffer.Pooled> entry = bucket.getPool().acquire();
|
||||
if (entry != null)
|
||||
if (entry == null)
|
||||
{
|
||||
bucket.recordPooled();
|
||||
RetainableByteBuffer.Pooled buffer = entry.getPooled();
|
||||
((PooledBuffer)buffer).acquire();
|
||||
return buffer;
|
||||
ByteBuffer buffer = BufferUtil.allocate(bucket.getCapacity(), direct);
|
||||
return new ReservedBuffer(buffer, bucket);
|
||||
}
|
||||
|
||||
return newRetainableByteBuffer(bucket.getCapacity(), direct, buffer -> reserve(bucket, buffer));
|
||||
bucket.recordPooled();
|
||||
RetainableByteBuffer.Pooled buffer = entry.getPooled();
|
||||
((PooledBuffer)buffer).acquire();
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private void reserve(RetainedBucket bucket, RetainableByteBuffer.Pooled buffer)
|
||||
@Override
|
||||
public boolean removeAndRelease(RetainableByteBuffer buffer)
|
||||
{
|
||||
RetainableByteBuffer actual = buffer;
|
||||
while (actual instanceof RetainableByteBuffer.Wrapper wrapper)
|
||||
actual = wrapper.getWrapped();
|
||||
|
||||
if (actual instanceof ReservedBuffer reservedBuffer)
|
||||
{
|
||||
// remove the actual reserved buffer, but release the wrapped buffer
|
||||
reservedBuffer.remove();
|
||||
return buffer.release();
|
||||
}
|
||||
|
||||
if (actual instanceof PooledBuffer poolBuffer)
|
||||
{
|
||||
// remove the actual pool buffer, but release the wrapped buffer
|
||||
poolBuffer.remove();
|
||||
return buffer.release();
|
||||
}
|
||||
|
||||
return ByteBufferPool.super.removeAndRelease(buffer);
|
||||
}
|
||||
|
||||
private void reserve(RetainedBucket bucket, ByteBuffer byteBuffer)
|
||||
{
|
||||
bucket.recordRelease();
|
||||
|
||||
|
@ -235,12 +260,11 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
|
|||
}
|
||||
|
||||
// Add the buffer to the new entry.
|
||||
ByteBuffer byteBuffer = buffer.getByteBuffer();
|
||||
BufferUtil.reset(byteBuffer);
|
||||
PooledBuffer pooledBuffer = new PooledBuffer(this, byteBuffer, b -> release(bucket, entry));
|
||||
PooledBuffer pooledBuffer = new PooledBuffer(byteBuffer, bucket, entry);
|
||||
if (entry.enable(pooledBuffer, false))
|
||||
{
|
||||
checkMaxMemory(bucket, buffer.isDirect());
|
||||
checkMaxMemory(bucket, byteBuffer.isDirect());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -270,6 +294,13 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
|
|||
entry.remove();
|
||||
}
|
||||
|
||||
private boolean remove(RetainedBucket bucket, Pool.Entry<RetainableByteBuffer.Pooled> entry)
|
||||
{
|
||||
// Cannot release, discard this buffer.
|
||||
bucket.recordRemove();
|
||||
return entry.remove();
|
||||
}
|
||||
|
||||
private void checkMaxMemory(RetainedBucket bucket, boolean direct)
|
||||
{
|
||||
long max = direct ? _maxDirectMemory : _maxHeapMemory;
|
||||
|
@ -309,14 +340,6 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
private RetainableByteBuffer.Pooled newRetainableByteBuffer(int capacity, boolean direct, Consumer<RetainableByteBuffer.Pooled> releaser)
|
||||
{
|
||||
ByteBuffer buffer = BufferUtil.allocate(capacity, direct);
|
||||
PooledBuffer retainableByteBuffer = new PooledBuffer(this, buffer, releaser);
|
||||
retainableByteBuffer.acquire();
|
||||
return retainableByteBuffer;
|
||||
}
|
||||
|
||||
public Pool<RetainableByteBuffer.Pooled> poolFor(int capacity, boolean direct)
|
||||
{
|
||||
RetainedBucket bucket = bucketFor(capacity, direct);
|
||||
|
@ -581,20 +604,49 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
private static class PooledBuffer extends RetainableByteBuffer.Pooled
|
||||
private class ReservedBuffer extends RetainableByteBuffer.Pooled
|
||||
{
|
||||
private final RetainedBucket _bucket;
|
||||
private final AtomicBoolean _removed = new AtomicBoolean();
|
||||
|
||||
private ReservedBuffer(ByteBuffer buffer, RetainedBucket bucket)
|
||||
{
|
||||
super(ArrayByteBufferPool.this, buffer);
|
||||
_bucket = Objects.requireNonNull(bucket);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release()
|
||||
{
|
||||
boolean released = super.release();
|
||||
if (released && _removed.compareAndSet(false, true))
|
||||
reserve(_bucket, getByteBuffer());
|
||||
return released;
|
||||
}
|
||||
|
||||
void remove()
|
||||
{
|
||||
// Buffer never added to pool, so just prevent future reservation
|
||||
_removed.compareAndSet(false, true);
|
||||
}
|
||||
}
|
||||
|
||||
private class PooledBuffer extends RetainableByteBuffer.Pooled
|
||||
{
|
||||
private final Consumer<Pooled> _releaser;
|
||||
private final ReferenceCounter _referenceCounter;
|
||||
private final RetainedBucket _bucket;
|
||||
private final Pool.Entry<RetainableByteBuffer.Pooled> _entry;
|
||||
private int _usages;
|
||||
|
||||
private PooledBuffer(ByteBufferPool pool, ByteBuffer buffer, Consumer<Pooled> releaser)
|
||||
private PooledBuffer(ByteBuffer buffer, RetainedBucket bucket, Pool.Entry<RetainableByteBuffer.Pooled> entry)
|
||||
{
|
||||
super(pool, buffer, new ReferenceCounter(0));
|
||||
super(ArrayByteBufferPool.this, buffer, new ReferenceCounter(0));
|
||||
if (getWrapped() instanceof ReferenceCounter referenceCounter)
|
||||
_referenceCounter = referenceCounter;
|
||||
else
|
||||
throw new IllegalArgumentException();
|
||||
this._releaser = releaser;
|
||||
_bucket = Objects.requireNonNull(bucket);
|
||||
_entry = Objects.requireNonNull(entry);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -602,13 +654,15 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
|
|||
{
|
||||
boolean released = super.release();
|
||||
if (released)
|
||||
{
|
||||
if (_releaser != null)
|
||||
_releaser.accept(this);
|
||||
}
|
||||
ArrayByteBufferPool.this.release(_bucket, _entry);
|
||||
return released;
|
||||
}
|
||||
|
||||
void remove()
|
||||
{
|
||||
ArrayByteBufferPool.this.remove(_bucket, _entry);
|
||||
}
|
||||
|
||||
private int use()
|
||||
{
|
||||
if (++_usages < 0)
|
||||
|
|
|
@ -58,6 +58,20 @@ public interface ByteBufferPool
|
|||
*/
|
||||
RetainableByteBuffer.Mutable acquire(int size, boolean direct);
|
||||
|
||||
/**
|
||||
* {@link RetainableByteBuffer#release() Release} the buffer in a way that will remove it from any pool that it may be in.
|
||||
* If the buffer is not in a pool, calling this method is equivalent to calling {@link RetainableByteBuffer#release()}.
|
||||
* Calling this method satisfies any contract that requires a call to {@link RetainableByteBuffer#release()}.
|
||||
* @return {@code true} if a call to {@link RetainableByteBuffer#release()} would have returned {@code true}.
|
||||
* @see RetainableByteBuffer#release()
|
||||
* @deprecated This API is experimental and may be removed in future releases
|
||||
*/
|
||||
@Deprecated
|
||||
default boolean removeAndRelease(RetainableByteBuffer buffer)
|
||||
{
|
||||
return buffer != null && buffer.release();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Removes all {@link RetainableByteBuffer#isRetained() non-retained}
|
||||
* pooled instances from this pool.</p>
|
||||
|
@ -161,7 +175,7 @@ public interface ByteBufferPool
|
|||
@Override
|
||||
public RetainableByteBuffer.Mutable acquire(int size, boolean direct)
|
||||
{
|
||||
return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct)).asMutable();
|
||||
return RetainableByteBuffer.wrap(BufferUtil.allocate(size, direct));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -92,7 +92,7 @@ public interface RetainableByteBuffer extends Retainable
|
|||
* @return a {@link FixedCapacity} buffer wrapping the passed {@link ByteBuffer}
|
||||
* @see ByteBufferPool#NON_POOLING
|
||||
*/
|
||||
static RetainableByteBuffer wrap(ByteBuffer byteBuffer)
|
||||
static RetainableByteBuffer.Mutable wrap(ByteBuffer byteBuffer)
|
||||
{
|
||||
return new FixedCapacity(byteBuffer);
|
||||
}
|
||||
|
@ -106,7 +106,7 @@ public interface RetainableByteBuffer extends Retainable
|
|||
* @return a {@link FixedCapacity} buffer wrapping the passed {@link ByteBuffer}
|
||||
* @see ByteBufferPool#NON_POOLING
|
||||
*/
|
||||
static RetainableByteBuffer wrap(ByteBuffer byteBuffer, Retainable retainable)
|
||||
static RetainableByteBuffer.Mutable wrap(ByteBuffer byteBuffer, Retainable retainable)
|
||||
{
|
||||
return new FixedCapacity(byteBuffer, retainable);
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ public interface RetainableByteBuffer extends Retainable
|
|||
* @param releaser a {@link Runnable} to call when the buffer is released.
|
||||
* @return a {@link FixedCapacity} buffer wrapping the passed {@link ByteBuffer}
|
||||
*/
|
||||
static RetainableByteBuffer wrap(ByteBuffer byteBuffer, Runnable releaser)
|
||||
static RetainableByteBuffer.Mutable wrap(ByteBuffer byteBuffer, Runnable releaser)
|
||||
{
|
||||
return new FixedCapacity(byteBuffer)
|
||||
{
|
||||
|
@ -2009,7 +2009,7 @@ public interface RetainableByteBuffer extends Retainable
|
|||
byteBuffer.limit(limit + Math.toIntExact(space));
|
||||
byteBuffer = byteBuffer.slice();
|
||||
byteBuffer.limit(limit);
|
||||
_aggregate = RetainableByteBuffer.wrap(byteBuffer, _aggregate).asMutable();
|
||||
_aggregate = RetainableByteBuffer.wrap(byteBuffer, _aggregate);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.not;
|
|||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
@ -445,4 +446,43 @@ public class ArrayByteBufferPoolTest
|
|||
assertThat(compoundPool.getPrimaryPool().size(), is(ConcurrentPool.OPTIMAL_MAX_SIZE));
|
||||
assertThat(compoundPool.getSecondaryPool().size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAndRelease()
|
||||
{
|
||||
ArrayByteBufferPool pool = new ArrayByteBufferPool();
|
||||
|
||||
RetainableByteBuffer reserved0 = pool.acquire(1024, false);
|
||||
RetainableByteBuffer reserved1 = pool.acquire(1024, false);
|
||||
|
||||
RetainableByteBuffer acquired0 = pool.acquire(1024, false);
|
||||
acquired0.release();
|
||||
acquired0 = pool.acquire(1024, false);
|
||||
RetainableByteBuffer acquired1 = pool.acquire(1024, false);
|
||||
acquired1.release();
|
||||
acquired1 = pool.acquire(1024, false);
|
||||
|
||||
RetainableByteBuffer retained0 = pool.acquire(1024, false);
|
||||
retained0.release();
|
||||
retained0 = pool.acquire(1024, false);
|
||||
retained0.retain();
|
||||
RetainableByteBuffer retained1 = pool.acquire(1024, false);
|
||||
retained1.release();
|
||||
retained1 = pool.acquire(1024, false);
|
||||
retained1.retain();
|
||||
|
||||
assertTrue(pool.removeAndRelease(reserved1));
|
||||
assertTrue(pool.removeAndRelease(acquired1));
|
||||
assertFalse(pool.removeAndRelease(retained1));
|
||||
assertTrue(retained1.release());
|
||||
|
||||
assertThat(pool.getHeapByteBufferCount(), is(2L));
|
||||
assertTrue(reserved0.release());
|
||||
assertThat(pool.getHeapByteBufferCount(), is(3L));
|
||||
assertTrue(acquired0.release());
|
||||
assertThat(pool.getHeapByteBufferCount(), is(3L));
|
||||
assertFalse(retained0.release());
|
||||
assertTrue(retained0.release());
|
||||
assertThat(pool.getHeapByteBufferCount(), is(3L));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,11 +17,12 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collection;
|
||||
import java.util.EventListener;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -39,7 +40,6 @@ import org.eclipse.jetty.util.BufferUtil;
|
|||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
|
||||
|
@ -344,26 +344,19 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
|
||||
private class Flusher extends IteratingCallback
|
||||
{
|
||||
private final AutoLock lock = new AutoLock();
|
||||
private final ArrayDeque<Entry> queue = new ArrayDeque<>();
|
||||
private final Queue<Entry> queue = new ConcurrentLinkedQueue<>();
|
||||
private Entry entry;
|
||||
|
||||
public void offer(Callback callback, SocketAddress address, ByteBuffer[] buffers)
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
queue.offer(new Entry(callback, address, buffers));
|
||||
}
|
||||
queue.offer(new Entry(callback, address, buffers));
|
||||
iterate();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Action process()
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
entry = queue.poll();
|
||||
}
|
||||
entry = queue.poll();
|
||||
if (entry == null)
|
||||
return Action.IDLE;
|
||||
|
||||
|
@ -372,17 +365,9 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
entry.callback.succeeded();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
entry.callback.failed(x);
|
||||
super.failed(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -394,10 +379,11 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
@Override
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
entry.callback.failed(cause);
|
||||
QuicConnection.this.close();
|
||||
}
|
||||
|
||||
private class Entry
|
||||
private static class Entry
|
||||
{
|
||||
private final Callback callback;
|
||||
private final SocketAddress address;
|
||||
|
|
|
@ -521,12 +521,11 @@ public abstract class QuicSession extends ContainerLifeCycle
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("written cipher bytes on {}", QuicSession.this);
|
||||
cipherBuffer.release();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -760,17 +760,11 @@ public class ConnectHandler extends Handler.Wrapper
|
|||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
protected void onSuccess()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Wrote {} bytes {}", filled, TunnelConnection.this);
|
||||
buffer.release();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.http.HttpException;
|
||||
|
@ -39,14 +40,15 @@ import org.eclipse.jetty.http.MimeTypes.Type;
|
|||
import org.eclipse.jetty.http.PreEncodedHttpField;
|
||||
import org.eclipse.jetty.http.QuotedQualityCSV;
|
||||
import org.eclipse.jetty.io.ByteBufferOutputStream;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.Retainable;
|
||||
import org.eclipse.jetty.io.RetainableByteBuffer;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Response;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.util.Attributes;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.ExceptionUtil;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
@ -198,7 +200,8 @@ public class ErrorHandler implements Request.Handler
|
|||
|
||||
int bufferSize = request.getConnectionMetaData().getHttpConfiguration().getOutputBufferSize();
|
||||
bufferSize = Math.min(8192, bufferSize); // TODO ?
|
||||
RetainableByteBuffer buffer = request.getComponents().getByteBufferPool().acquire(bufferSize, false);
|
||||
ByteBufferPool byteBufferPool = request.getComponents().getByteBufferPool();
|
||||
RetainableByteBuffer buffer = byteBufferPool.acquire(bufferSize, false);
|
||||
|
||||
try
|
||||
{
|
||||
|
@ -251,13 +254,14 @@ public class ErrorHandler implements Request.Handler
|
|||
}
|
||||
|
||||
response.getHeaders().put(type.getContentTypeField(charset));
|
||||
response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, buffer));
|
||||
response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, byteBufferPool, buffer));
|
||||
|
||||
return true;
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
buffer.release();
|
||||
if (buffer != null)
|
||||
byteBufferPool.removeAndRelease(buffer);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
|
@ -579,20 +583,33 @@ public class ErrorHandler implements Request.Handler
|
|||
* when calling {@link Response#write(boolean, ByteBuffer, Callback)} to wrap the passed in {@link Callback}
|
||||
* so that the {@link RetainableByteBuffer} used can be released.
|
||||
*/
|
||||
private static class WriteErrorCallback extends Callback.Nested
|
||||
private static class WriteErrorCallback implements Callback
|
||||
{
|
||||
private final Retainable _retainable;
|
||||
private final AtomicReference<Callback> _callback;
|
||||
private final ByteBufferPool _pool;
|
||||
private final RetainableByteBuffer _buffer;
|
||||
|
||||
public WriteErrorCallback(Callback callback, Retainable retainable)
|
||||
public WriteErrorCallback(Callback callback, ByteBufferPool pool, RetainableByteBuffer retainable)
|
||||
{
|
||||
super(callback);
|
||||
_retainable = retainable;
|
||||
_callback = new AtomicReference<>(callback);
|
||||
_pool = pool;
|
||||
_buffer = retainable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completed()
|
||||
public void succeeded()
|
||||
{
|
||||
_retainable.release();
|
||||
Callback callback = _callback.getAndSet(null);
|
||||
if (callback != null)
|
||||
ExceptionUtil.callAndThen(_buffer::release, callback::succeeded);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
Callback callback = _callback.getAndSet(null);
|
||||
if (callback != null)
|
||||
ExceptionUtil.callAndThen(x, t -> _pool.removeAndRelease(_buffer), callback::failed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -237,7 +237,8 @@ public class CustomTransportTest
|
|||
channels.put(channel.id, channel);
|
||||
|
||||
// Register for read interest with the EndPoint.
|
||||
endPoint.fillInterested(new EndPointToChannelCallback(channel));
|
||||
EndPointToChannelCallback endPointToChannelCallback = new EndPointToChannelCallback(channel);
|
||||
endPoint.fillInterested(Callback.from(endPointToChannelCallback::iterate));
|
||||
}
|
||||
|
||||
// Called when there data to read from the Gateway on the given Channel.
|
||||
|
@ -322,18 +323,10 @@ public class CustomTransportTest
|
|||
endPoint.fillInterested(this);
|
||||
return Action.IDLE;
|
||||
}
|
||||
channel.write(this, buffer);
|
||||
channel.write(Callback.from(this::iterate), buffer);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
// There is data to read from the EndPoint.
|
||||
// Iterate to read it and send it to the Gateway.
|
||||
iterate();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
|
|
|
@ -167,6 +167,18 @@ public abstract class IteratingCallback implements Callback
|
|||
*/
|
||||
protected abstract Action process() throws Throwable;
|
||||
|
||||
/**
|
||||
* Invoked when one task has completed successfully, either by the
|
||||
* caller thread or by the processing thread. This invocation is
|
||||
* always serialized w.r.t the execution of {@link #process()}.
|
||||
* <p>
|
||||
* This method is not invoked when a call to {@link #abort(Throwable)}
|
||||
* is made before the {@link #succeeded()} callback happens.
|
||||
*/
|
||||
protected void onSuccess()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked when the overall task has completed successfully.
|
||||
*
|
||||
|
@ -255,6 +267,7 @@ public abstract class IteratingCallback implements Callback
|
|||
// Fall through to possibly invoke onCompleteFailure().
|
||||
}
|
||||
|
||||
boolean callOnSuccess = false;
|
||||
// acted on the action we have just received
|
||||
try (AutoLock ignored = _lock.lock())
|
||||
{
|
||||
|
@ -305,6 +318,7 @@ public abstract class IteratingCallback implements Callback
|
|||
|
||||
case CALLED:
|
||||
{
|
||||
callOnSuccess = true;
|
||||
if (action != Action.SCHEDULED)
|
||||
throw new IllegalStateException(String.format("%s[action=%s]", this, action));
|
||||
// we lost the race, so we have to keep processing
|
||||
|
@ -327,6 +341,11 @@ public abstract class IteratingCallback implements Callback
|
|||
throw new IllegalStateException(String.format("%s[action=%s]", this, action));
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (callOnSuccess)
|
||||
onSuccess();
|
||||
}
|
||||
}
|
||||
|
||||
if (notifyCompleteSuccess)
|
||||
|
@ -338,8 +357,11 @@ public abstract class IteratingCallback implements Callback
|
|||
/**
|
||||
* Method to invoke when the asynchronous sub-task succeeds.
|
||||
* <p>
|
||||
* Subclasses that override this method must always remember
|
||||
* to call {@code super.succeeded()}.
|
||||
* This method should be considered final for all practical purposes.
|
||||
* <p>
|
||||
* Eventually, {@link #onSuccess()} is
|
||||
* called, either by the caller thread or by the processing
|
||||
* thread.
|
||||
*/
|
||||
@Override
|
||||
public void succeeded()
|
||||
|
@ -374,7 +396,10 @@ public abstract class IteratingCallback implements Callback
|
|||
}
|
||||
}
|
||||
if (process)
|
||||
{
|
||||
onSuccess();
|
||||
processing();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -382,8 +407,7 @@ public abstract class IteratingCallback implements Callback
|
|||
* or to fail the overall asynchronous task and therefore
|
||||
* terminate the iteration.
|
||||
* <p>
|
||||
* Subclasses that override this method must always remember
|
||||
* to call {@code super.failed(Throwable)}.
|
||||
* This method should be considered final for all practical purposes.
|
||||
* <p>
|
||||
* Eventually, {@link #onCompleteFailure(Throwable)} is
|
||||
* called, either by the caller thread or by the processing
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
|
|||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class IteratingCallbackTest
|
||||
|
@ -435,4 +436,28 @@ public class IteratingCallbackTest
|
|||
|
||||
assertEquals(1, count.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnSuccessCalledDespiteISE() throws Exception
|
||||
{
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
IteratingCallback icb = new IteratingCallback()
|
||||
{
|
||||
@Override
|
||||
protected Action process()
|
||||
{
|
||||
succeeded();
|
||||
return Action.IDLE; // illegal action
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onSuccess()
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
assertThrows(IllegalStateException.class, icb::iterate);
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -192,10 +192,9 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
super.failed(x);
|
||||
onError(x);
|
||||
onError(cause);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<Arg><Property name="jetty.quickstart.mode"/></Arg>
|
||||
</Call>
|
||||
|
||||
<Ref refid="WebAppProvider">
|
||||
<Ref refid="WebAppProvider-ee10">
|
||||
<Get name="properties">
|
||||
<Put name="jetty.deploy.attribute.org.eclipse.jetty.quickstart.mode"><Property name="jetty.quickstart.mode"/></Put>
|
||||
<Put name="jetty.deploy.attribute.org.eclipse.jetty.quickstart.origin"><Property name="jetty.quickstart.origin"/></Put>
|
||||
|
|
|
@ -132,6 +132,9 @@
|
|||
<configuration>
|
||||
<debug>${it.debug}</debug>
|
||||
<addTestClassPath>true</addTestClassPath>
|
||||
<extraArtifacts>
|
||||
<extraArtifact>org.eclipse.jetty:jetty-client:${project.version}</extraArtifact>
|
||||
</extraArtifacts>
|
||||
<junitPackageName>org.eclipse.jetty.maven.its.ee10.runner</junitPackageName>
|
||||
<scope>test</scope>
|
||||
<scriptVariables>
|
||||
|
|
|
@ -135,6 +135,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
private long _written;
|
||||
private long _flushed;
|
||||
private long _firstByteNanoTime = -1;
|
||||
private ByteBufferPool _pool;
|
||||
private RetainableByteBuffer _aggregate;
|
||||
private int _bufferSize;
|
||||
private int _commitSize;
|
||||
|
@ -222,7 +223,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_state = State.CLOSED;
|
||||
closedCallback = _closedCallback;
|
||||
_closedCallback = null;
|
||||
releaseBuffer();
|
||||
lockedReleaseBuffer(failure != null);
|
||||
wake = updateApiState(failure);
|
||||
}
|
||||
else if (_state == State.CLOSE)
|
||||
|
@ -444,7 +445,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
try (AutoLock ignored = _channelState.lock())
|
||||
{
|
||||
_state = State.CLOSED;
|
||||
releaseBuffer();
|
||||
lockedReleaseBuffer(failure != null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -576,25 +577,36 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
try (AutoLock ignored = _channelState.lock())
|
||||
{
|
||||
return acquireBuffer().getByteBuffer();
|
||||
return lockedAcquireBuffer().getByteBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
private RetainableByteBuffer acquireBuffer()
|
||||
private RetainableByteBuffer lockedAcquireBuffer()
|
||||
{
|
||||
assert _channelState.isLockHeldByCurrentThread();
|
||||
|
||||
boolean useOutputDirectByteBuffers = _servletChannel.getConnectionMetaData().getHttpConfiguration().isUseOutputDirectByteBuffers();
|
||||
ByteBufferPool pool = _servletChannel.getRequest().getComponents().getByteBufferPool();
|
||||
|
||||
if (_aggregate == null)
|
||||
_aggregate = pool.acquire(getBufferSize(), useOutputDirectByteBuffers);
|
||||
{
|
||||
_pool = _servletChannel.getRequest().getComponents().getByteBufferPool();
|
||||
_aggregate = _pool.acquire(getBufferSize(), useOutputDirectByteBuffers);
|
||||
}
|
||||
return _aggregate;
|
||||
}
|
||||
|
||||
private void releaseBuffer()
|
||||
private void lockedReleaseBuffer(boolean failure)
|
||||
{
|
||||
assert _channelState.isLockHeldByCurrentThread();
|
||||
|
||||
if (_aggregate != null)
|
||||
{
|
||||
_aggregate.release();
|
||||
if (failure && _pool != null)
|
||||
_pool.removeAndRelease(_aggregate);
|
||||
else
|
||||
_aggregate.release();
|
||||
_aggregate = null;
|
||||
_pool = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -752,7 +764,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
// Should we aggregate?
|
||||
if (aggregate)
|
||||
{
|
||||
acquireBuffer();
|
||||
lockedAcquireBuffer();
|
||||
int filled = BufferUtil.fill(_aggregate.getByteBuffer(), b, off, len);
|
||||
|
||||
// return if we are not complete, not full and filled all the content
|
||||
|
@ -957,7 +969,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
}
|
||||
_written = written;
|
||||
|
||||
acquireBuffer();
|
||||
lockedAcquireBuffer();
|
||||
BufferUtil.append(_aggregate.getByteBuffer(), (byte)b);
|
||||
}
|
||||
|
||||
|
@ -1260,6 +1272,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
try (AutoLock ignored = _channelState.lock())
|
||||
{
|
||||
lockedReleaseBuffer(_state != State.CLOSED);
|
||||
_state = State.OPEN;
|
||||
_apiState = ApiState.BLOCKING;
|
||||
_softClose = true; // Stay closed until next request
|
||||
|
@ -1268,7 +1281,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_commitSize = config.getOutputAggregationSize();
|
||||
if (_commitSize > _bufferSize)
|
||||
_commitSize = _bufferSize;
|
||||
releaseBuffer();
|
||||
_written = 0;
|
||||
_writeListener = null;
|
||||
_onError = null;
|
||||
|
|
|
@ -227,6 +227,11 @@ public class ServletChannelState
|
|||
return _lock.lock();
|
||||
}
|
||||
|
||||
boolean isLockHeldByCurrentThread()
|
||||
{
|
||||
return _lock.isHeldByCurrentThread();
|
||||
}
|
||||
|
||||
public State getState()
|
||||
{
|
||||
try (AutoLock ignored = lock())
|
||||
|
|
|
@ -19,7 +19,6 @@ import java.util.List;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import jakarta.servlet.AsyncContext;
|
||||
import jakarta.servlet.DispatcherType;
|
||||
|
@ -73,7 +72,7 @@ public class ContextScopeListenerTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncServlet() throws Exception
|
||||
public void testAsyncServletAsyncBeforeDoGetExit() throws Exception
|
||||
{
|
||||
_contextHandler.addServlet(new ServletHolder(new HttpServlet()
|
||||
{
|
||||
|
@ -87,12 +86,25 @@ public class ContextScopeListenerTest
|
|||
}
|
||||
|
||||
_history.add("doGet");
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AsyncContext asyncContext = req.startAsync();
|
||||
asyncContext.start(() ->
|
||||
{
|
||||
_history.add("asyncRunnable");
|
||||
asyncContext.dispatch("/dispatch");
|
||||
latch.countDown();
|
||||
// wait until doGet call has exited
|
||||
Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> _history.get(_history.size() - 1).equals("exitScope /initialPath"));
|
||||
});
|
||||
|
||||
try
|
||||
{
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}), "/");
|
||||
|
||||
|
@ -100,13 +112,9 @@ public class ContextScopeListenerTest
|
|||
|
||||
_contextHandler.addEventListener(new ContextHandler.ContextScopeListener()
|
||||
{
|
||||
// Use a lock to prevent the async thread running the listener concurrently.
|
||||
private final ReentrantLock _lock = new ReentrantLock();
|
||||
|
||||
@Override
|
||||
public void enterScope(Context context, Request request)
|
||||
{
|
||||
_lock.lock();
|
||||
String pathInContext = (request == null) ? "null" : Request.getPathInContext(request);
|
||||
_history.add("enterScope " + pathInContext);
|
||||
}
|
||||
|
@ -129,9 +137,81 @@ public class ContextScopeListenerTest
|
|||
assertHistory(
|
||||
"enterScope /initialPath",
|
||||
"doGet",
|
||||
"exitScope /initialPath",
|
||||
"enterScope /initialPath",
|
||||
"asyncRunnable",
|
||||
"asyncDispatch",
|
||||
"exitScope /initialPath",
|
||||
"exitScope /initialPath"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncServletAsyncAfterDoGetExit() throws Exception
|
||||
{
|
||||
_contextHandler.addServlet(new ServletHolder(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
|
||||
{
|
||||
if (req.getDispatcherType() == DispatcherType.ASYNC)
|
||||
{
|
||||
_history.add("asyncDispatch");
|
||||
return;
|
||||
}
|
||||
|
||||
_history.add("doGet");
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AsyncContext asyncContext = req.startAsync();
|
||||
asyncContext.start(() ->
|
||||
{
|
||||
latch.countDown();
|
||||
// wait until doGet call has exited
|
||||
Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> _history.get(_history.size() - 1).equals("exitScope /initialPath"));
|
||||
|
||||
_history.add("asyncRunnable");
|
||||
asyncContext.dispatch("/dispatch");
|
||||
});
|
||||
|
||||
try
|
||||
{
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
}), "/");
|
||||
|
||||
_contextHandler.addEventListener(new ContextHandler.ContextScopeListener()
|
||||
{
|
||||
@Override
|
||||
public void enterScope(Context context, Request request)
|
||||
{
|
||||
String pathInContext = (request == null) ? "null" : Request.getPathInContext(request);
|
||||
_history.add("enterScope " + pathInContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exitScope(Context context, Request request)
|
||||
{
|
||||
String pathInContext = (request == null) ? "null" : Request.getPathInContext(request);
|
||||
_history.add("exitScope " + pathInContext);
|
||||
}
|
||||
});
|
||||
|
||||
URI uri = URI.create("http://localhost:" + _connector.getLocalPort() + "/initialPath");
|
||||
ContentResponse response = _client.GET(uri);
|
||||
assertThat(response.getStatus(), equalTo(HttpStatus.OK_200));
|
||||
|
||||
Awaitility.waitAtMost(5, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> _history.size() == 9);
|
||||
assertHistory(
|
||||
"enterScope /initialPath",
|
||||
"doGet",
|
||||
"enterScope /initialPath",
|
||||
"exitScope /initialPath",
|
||||
"asyncRunnable",
|
||||
"exitScope /initialPath",
|
||||
"enterScope /initialPath",
|
||||
"asyncDispatch",
|
||||
|
|
|
@ -1,11 +1,7 @@
|
|||
<?xml version="1.0"?><!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "https://www.eclipse.org/jetty/configure_10_0.dtd">
|
||||
|
||||
<!-- =============================================================== -->
|
||||
<!-- Create the deployment manager -->
|
||||
<!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
|
||||
<!-- The deplyment manager handles the lifecycle of deploying web -->
|
||||
<!-- applications. Apps are provided by instances of the -->
|
||||
<!-- AppProvider interface. -->
|
||||
<!-- Attach the "ee10" environment app deployment provider -->
|
||||
<!-- =============================================================== -->
|
||||
<Configure id="Server" class="org.eclipse.jetty.server.Server">
|
||||
<Call id="DeploymentManager" name="getBean">
|
||||
|
@ -20,7 +16,7 @@
|
|||
<Ref refid="DeploymentManager">
|
||||
<Call name="addAppProvider">
|
||||
<Arg>
|
||||
<New id="WebAppProvider" class="org.eclipse.jetty.deploy.providers.ContextProvider">
|
||||
<New id="WebAppProvider-ee10" class="org.eclipse.jetty.deploy.providers.ContextProvider">
|
||||
<Set name="environmentName">ee10</Set>
|
||||
<Set name="monitoredDirName">
|
||||
<Call name="resolvePath" class="org.eclipse.jetty.xml.XmlConfiguration">
|
||||
|
@ -41,6 +37,7 @@
|
|||
</Default>
|
||||
</Property>
|
||||
</Set>
|
||||
<Set name="deferInitialScan" property="jetty.deploy.deferInitialScan"/>
|
||||
<Set name="scanInterval" property="jetty.deploy.scanInterval"/>
|
||||
<Set name="extractWars" property="jetty.deploy.extractWars" />
|
||||
<Set name="parentLoaderPriority" property="jetty.deploy.parentLoaderPriority" />
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<Arg><Property name="jetty.quickstart.mode"/></Arg>
|
||||
</Call>
|
||||
|
||||
<Ref refid="WebAppProvider">
|
||||
<Ref refid="WebAppProvider-ee8">
|
||||
<Get name="properties">
|
||||
<Put name="jetty.deploy.attribute.org.eclipse.jetty.quickstart.mode"><Property name="jetty.quickstart.mode"/></Put>
|
||||
<Put name="jetty.deploy.attribute.org.eclipse.jetty.quickstart.origin"><Property name="jetty.quickstart.origin"/></Put>
|
||||
|
|
|
@ -136,6 +136,9 @@
|
|||
<configuration>
|
||||
<debug>${it.debug}</debug>
|
||||
<addTestClassPath>true</addTestClassPath>
|
||||
<extraArtifacts>
|
||||
<extraArtifact>org.eclipse.jetty:jetty-client:${project.version}</extraArtifact>
|
||||
</extraArtifacts>
|
||||
<junitPackageName>org.eclipse.jetty.maven.its.ee8.runner</junitPackageName>
|
||||
<scope>test</scope>
|
||||
<scriptVariables>
|
||||
|
|
|
@ -1,11 +1,7 @@
|
|||
<?xml version="1.0"?><!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "https://eclipse.dev/jetty/configure_10_0.dtd">
|
||||
|
||||
<!-- =============================================================== -->
|
||||
<!-- Create the deployment manager -->
|
||||
<!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
|
||||
<!-- The deplyment manager handles the lifecycle of deploying web -->
|
||||
<!-- applications. Apps are provided by instances of the -->
|
||||
<!-- AppProvider interface. -->
|
||||
<!-- Attach the "ee8" environment app deployment provider -->
|
||||
<!-- =============================================================== -->
|
||||
<Configure id="Server" class="org.eclipse.jetty.server.Server">
|
||||
<Call id="DeploymentManager" name="getBean">
|
||||
|
@ -20,7 +16,7 @@
|
|||
<Ref refid="DeploymentManager">
|
||||
<Call name="addAppProvider">
|
||||
<Arg>
|
||||
<New id="WebAppProvider" class="org.eclipse.jetty.deploy.providers.ContextProvider">
|
||||
<New id="WebAppProvider-ee8" class="org.eclipse.jetty.deploy.providers.ContextProvider">
|
||||
<Set name="EnvironmentName">ee8</Set>
|
||||
<Set name="monitoredDirName">
|
||||
<Call name="resolvePath" class="org.eclipse.jetty.xml.XmlConfiguration">
|
||||
|
|
|
@ -158,6 +158,11 @@ public class HttpChannelState
|
|||
return _lock.lock();
|
||||
}
|
||||
|
||||
boolean isLockHeldByCurrentThread()
|
||||
{
|
||||
return _lock.isHeldByCurrentThread();
|
||||
}
|
||||
|
||||
public State getState()
|
||||
{
|
||||
try (AutoLock l = lock())
|
||||
|
|
|
@ -32,6 +32,7 @@ import jakarta.servlet.ServletRequest;
|
|||
import jakarta.servlet.ServletResponse;
|
||||
import jakarta.servlet.WriteListener;
|
||||
import org.eclipse.jetty.http.content.HttpContent;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.EofException;
|
||||
import org.eclipse.jetty.io.IOResources;
|
||||
|
@ -198,6 +199,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
private long _written;
|
||||
private long _flushed;
|
||||
private long _firstByteNanoTime = -1;
|
||||
private ByteBufferPool _pool;
|
||||
private RetainableByteBuffer _aggregate;
|
||||
private int _bufferSize;
|
||||
private int _commitSize;
|
||||
|
@ -299,7 +301,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_state = State.CLOSED;
|
||||
closedCallback = _closedCallback;
|
||||
_closedCallback = null;
|
||||
releaseBuffer();
|
||||
lockedReleaseBuffer(failure != null);
|
||||
wake = updateApiState(failure);
|
||||
}
|
||||
else if (_state == State.CLOSE)
|
||||
|
@ -520,7 +522,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
try (AutoLock l = _channelState.lock())
|
||||
{
|
||||
_state = State.CLOSED;
|
||||
releaseBuffer();
|
||||
lockedReleaseBuffer(failure != null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -649,23 +651,34 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
try (AutoLock l = _channelState.lock())
|
||||
{
|
||||
return acquireBuffer().getByteBuffer();
|
||||
return lockedAcquireBuffer().getByteBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
private RetainableByteBuffer acquireBuffer()
|
||||
private RetainableByteBuffer lockedAcquireBuffer()
|
||||
{
|
||||
assert _channelState.isLockHeldByCurrentThread();
|
||||
|
||||
if (_aggregate == null)
|
||||
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.isUseOutputDirectByteBuffers());
|
||||
{
|
||||
_pool = _channel.getByteBufferPool();
|
||||
_aggregate = _pool.acquire(getBufferSize(), _channel.isUseOutputDirectByteBuffers());
|
||||
}
|
||||
return _aggregate;
|
||||
}
|
||||
|
||||
private void releaseBuffer()
|
||||
private void lockedReleaseBuffer(boolean failure)
|
||||
{
|
||||
assert _channelState.isLockHeldByCurrentThread();
|
||||
|
||||
if (_aggregate != null)
|
||||
{
|
||||
_aggregate.release();
|
||||
if (failure && _pool != null)
|
||||
_pool.removeAndRelease(_aggregate);
|
||||
else
|
||||
_aggregate.release();
|
||||
_aggregate = null;
|
||||
_pool = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -828,7 +841,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
// Should we aggregate?
|
||||
if (aggregate)
|
||||
{
|
||||
acquireBuffer();
|
||||
lockedAcquireBuffer();
|
||||
int filled = BufferUtil.fill(_aggregate.getByteBuffer(), b, off, len);
|
||||
|
||||
// return if we are not complete, not full and filled all the content
|
||||
|
@ -1033,7 +1046,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
}
|
||||
_written = written;
|
||||
|
||||
acquireBuffer();
|
||||
lockedAcquireBuffer();
|
||||
BufferUtil.append(_aggregate.getByteBuffer(), (byte)b);
|
||||
}
|
||||
|
||||
|
@ -1438,6 +1451,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
try (AutoLock l = _channelState.lock())
|
||||
{
|
||||
lockedReleaseBuffer(_state != State.CLOSED);
|
||||
_state = State.OPEN;
|
||||
_apiState = ApiState.BLOCKING;
|
||||
_softClose = true; // Stay closed until next request
|
||||
|
@ -1447,7 +1461,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_commitSize = config.getOutputAggregationSize();
|
||||
if (_commitSize > _bufferSize)
|
||||
_commitSize = _bufferSize;
|
||||
releaseBuffer();
|
||||
_written = 0;
|
||||
_writeListener = null;
|
||||
_onError = null;
|
||||
|
|
|
@ -192,10 +192,9 @@ public class AsyncProxyServlet extends ProxyServlet
|
|||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
super.failed(x);
|
||||
onError(x);
|
||||
onError(cause);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<Arg><Property name="jetty.quickstart.mode"/></Arg>
|
||||
</Call>
|
||||
|
||||
<Ref refid="WebAppProvider">
|
||||
<Ref refid="WebAppProvider-ee9">
|
||||
<Get name="properties">
|
||||
<Put name="jetty.deploy.attribute.org.eclipse.jetty.quickstart.mode"><Property name="jetty.quickstart.mode"/></Put>
|
||||
<Put name="jetty.deploy.attribute.org.eclipse.jetty.quickstart.origin"><Property name="jetty.quickstart.origin"/></Put>
|
||||
|
|
|
@ -134,6 +134,9 @@
|
|||
<configuration>
|
||||
<debug>${it.debug}</debug>
|
||||
<addTestClassPath>true</addTestClassPath>
|
||||
<extraArtifacts>
|
||||
<extraArtifact>org.eclipse.jetty:jetty-client:${project.version}</extraArtifact>
|
||||
</extraArtifacts>
|
||||
<junitPackageName>org.eclipse.jetty.maven.its.ee9.runner</junitPackageName>
|
||||
<scope>test</scope>
|
||||
<scriptVariables>
|
||||
|
|
|
@ -17,7 +17,8 @@ import java.net.URI;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import jakarta.servlet.AsyncContext;
|
||||
import jakarta.servlet.DispatcherType;
|
||||
|
@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test;
|
|||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class ContextScopeListenerTest
|
||||
{
|
||||
|
@ -83,24 +85,31 @@ public class ContextScopeListenerTest
|
|||
}
|
||||
|
||||
_history.add("doGet");
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AsyncContext asyncContext = req.startAsync();
|
||||
asyncContext.start(() ->
|
||||
{
|
||||
_history.add("asyncRunnable");
|
||||
asyncContext.dispatch("/dispatch");
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
try
|
||||
{
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}), "/");
|
||||
|
||||
_contextHandler.addEventListener(new ContextHandler.ContextScopeListener()
|
||||
{
|
||||
// Use a lock to prevent the async thread running the listener concurrently.
|
||||
private final ReentrantLock _lock = new ReentrantLock();
|
||||
|
||||
@Override
|
||||
public void enterScope(ContextHandler.APIContext context, org.eclipse.jetty.ee9.nested.Request request, Object reason)
|
||||
{
|
||||
_lock.lock();
|
||||
String pathInContext = (request == null) ? "null" : URIUtil.addPaths(request.getServletPath(), request.getPathInfo());
|
||||
_history.add("enterScope " + pathInContext);
|
||||
}
|
||||
|
@ -110,7 +119,6 @@ public class ContextScopeListenerTest
|
|||
{
|
||||
String pathInContext = (request == null) ? "null" : URIUtil.addPaths(request.getServletPath(), request.getPathInfo());
|
||||
_history.add("exitScope " + pathInContext);
|
||||
_lock.unlock();
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -120,14 +128,10 @@ public class ContextScopeListenerTest
|
|||
assertHistory(
|
||||
"enterScope /initialPath",
|
||||
"doGet",
|
||||
"exitScope /initialPath",
|
||||
"enterScope /initialPath",
|
||||
"asyncRunnable",
|
||||
"exitScope /initialPath",
|
||||
"enterScope /dispatch",
|
||||
"asyncDispatch",
|
||||
"exitScope /dispatch",
|
||||
"enterScope /dispatch",
|
||||
"exitScope /dispatch"
|
||||
);
|
||||
}
|
||||
|
|
|
@ -1,11 +1,7 @@
|
|||
<?xml version="1.0"?><!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "https://www.eclipse.org/jetty/configure_10_0.dtd">
|
||||
|
||||
<!-- =============================================================== -->
|
||||
<!-- Create the deployment manager -->
|
||||
<!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
|
||||
<!-- The deplyment manager handles the lifecycle of deploying web -->
|
||||
<!-- applications. Apps are provided by instances of the -->
|
||||
<!-- AppProvider interface. -->
|
||||
<!-- Attach the "ee9" environment app deployment provider -->
|
||||
<!-- =============================================================== -->
|
||||
<Configure id="Server" class="org.eclipse.jetty.server.Server">
|
||||
<Call id="DeploymentManager" name="getBean">
|
||||
|
@ -20,7 +16,7 @@
|
|||
<Ref refid="DeploymentManager">
|
||||
<Call name="addAppProvider">
|
||||
<Arg>
|
||||
<New id="WebAppProvider" class="org.eclipse.jetty.deploy.providers.ContextProvider">
|
||||
<New id="WebAppProvider-ee9" class="org.eclipse.jetty.deploy.providers.ContextProvider">
|
||||
<Set name="EnvironmentName">ee9</Set>
|
||||
<Set name="monitoredDirName">
|
||||
<Call name="resolvePath" class="org.eclipse.jetty.xml.XmlConfiguration">
|
||||
|
@ -41,6 +37,7 @@
|
|||
</Default>
|
||||
</Property>
|
||||
</Set>
|
||||
<Set name="deferInitialScan" property="jetty.deploy.deferInitialScan"/>
|
||||
<Set name="scanInterval" property="jetty.deploy.scanInterval"/>
|
||||
<Set name="extractWars" property="jetty.deploy.extractWars" />
|
||||
<Set name="parentLoaderPriority" property="jetty.deploy.parentLoaderPriority" />
|
||||
|
|
Loading…
Reference in New Issue