Merge remote-tracking branch 'origin/jetty-11.0.x' into jetty-12.0.x

This commit is contained in:
Greg Wilkins 2022-07-04 11:32:33 +10:00
commit fe83ea87dc
48 changed files with 1845 additions and 1986 deletions

87
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@ -0,0 +1,87 @@
name: "CodeQL"
on:
push:
branches: [ 'jetty-10.[1-9]?[0-9].x', 'jetty-11.[1-9]?[0-9].x', 'jetty-12.[1-9]?[0-9].x' ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ 'jetty-10.[1-9]?[0-9].x', 'jetty-11.[1-9]?[0-9].x', 'jetty-12.[1-9]?[0-9].x' ]
schedule:
- cron: '22 1 * * 2'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'java', 'javascript' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
steps:
- name: Checkout repository
uses: actions/checkout@v3
# Install and setup JDK 11
- name: Setup JDK 11
uses: actions/setup-java@v3
if: ${{
startsWith(github.ref, 'refs/heads/jetty-10.') ||
startsWith(github.ref, 'refs/heads/jetty-11.') ||
startsWith(github.base_ref, 'jetty-10.') ||
startsWith(github.base_ref, 'jetty-11.')
}}
with:
distribution: temurin
java-version: 11
cache: maven
# Install and setup JDK 17
- name: Setup JDK 17
uses: actions/setup-java@v3
if: ${{
startsWith(github.ref, 'refs/heads/jetty-12.') ||
startsWith(github.base_ref, 'jetty-12.')
}}
with:
distribution: temurin
java-version: 17
cache: maven
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.
# - run: |
# echo "Run, Build Application using script"
# ./location_of_script_within_repo/buildscript.sh
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2

View File

@ -64,7 +64,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive());
}
this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(httpClient, httpClient.getByteBufferPool());
this.retainableByteBufferPool = httpClient.getByteBufferPool().asRetainableByteBufferPool();
}
@Override

View File

@ -22,9 +22,9 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@ -708,6 +708,50 @@ public class HttpClientTLSTest
assertEquals(0, clientBytes.get());
}
protected class TestRetained extends ArrayRetainableByteBufferPool
{
private final ByteBufferPool _pool;
public TestRetained(ByteBufferPool pool, int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
super(0, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
_pool = pool;
}
@Override
protected ByteBuffer allocate(int capacity)
{
return _pool.acquire(capacity, false);
}
@Override
protected ByteBuffer allocateDirect(int capacity)
{
return _pool.acquire(capacity, true);
}
@Override
protected void removed(RetainableByteBuffer retainedBuffer)
{
_pool.release(retainedBuffer.getBuffer());
}
@Override
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return super.poolFor(capacity, direct);
}
}
private class TestByteBufferPool extends ArrayByteBufferPool
{
@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new TestRetained(this, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
}
@Test
public void testEncryptedInputBufferRepooling() throws Exception
{
@ -715,15 +759,10 @@ public class HttpClientTLSTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
var retainableByteBufferPool = new ArrayRetainableByteBufferPool()
{
@Override
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return super.poolFor(capacity, direct);
}
};
server.addBean(retainableByteBufferPool);
ArrayByteBufferPool byteBufferPool = new TestByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
server.addBean(byteBufferPool);
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
@ -765,9 +804,12 @@ public class HttpClientTLSTest
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
Pool<RetainableByteBuffer> bucket = retainableByteBufferPool.poolFor(16 * 1024 + 1, ssl.isDirectBuffersForEncryption());
Pool<RetainableByteBuffer> bucket = ((TestRetained)retainableByteBufferPool).poolFor(16 * 1024 + 1, connector.getConnectionFactory(HttpConnectionFactory.class).isUseInputDirectByteBuffers());
assertEquals(1, bucket.size());
assertEquals(1, bucket.getIdleCount());
long count = ssl.isDirectBuffersForDecryption() ? byteBufferPool.getDirectByteBufferCount() : byteBufferPool.getHeapByteBufferCount();
assertEquals(1, count);
}
@Test
@ -777,7 +819,7 @@ public class HttpClientTLSTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
List<ByteBuffer> leakedBuffers = new CopyOnWriteArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
@ -834,6 +876,7 @@ public class HttpClientTLSTest
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}
@ -845,7 +888,7 @@ public class HttpClientTLSTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
List<ByteBuffer> leakedBuffers = new CopyOnWriteArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
@ -917,6 +960,7 @@ public class HttpClientTLSTest
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}
@ -928,7 +972,7 @@ public class HttpClientTLSTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
List<ByteBuffer> leakedBuffers = new CopyOnWriteArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
@ -1000,6 +1044,7 @@ public class HttpClientTLSTest
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}

View File

@ -79,7 +79,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
HttpClient client = destination.getHttpClient();
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
this.networkByteBufferPool = client.getByteBufferPool().asRetainableByteBufferPool();
}
public HttpDestination getHttpDestination()

View File

@ -120,7 +120,7 @@ public class ServerGenerator extends Generator
private ByteBuffer generateEndRequest(int request, boolean aborted)
{
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = acquire(8);
ByteBuffer endRequestBuffer = acquire(16);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);

View File

@ -63,7 +63,7 @@ public class ServerFCGIConnection extends AbstractConnection implements Connecti
{
super(endPoint, connector.getExecutor());
this.connector = connector;
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
this.networkByteBufferPool = connector.getByteBufferPool().asRetainableByteBufferPool();
this.flusher = new Flusher(endPoint);
this.configuration = configuration;
this.sendStatus200 = sendStatus200;

View File

@ -69,7 +69,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);

View File

@ -291,7 +291,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
parser.setMaxFrameLength(getMaxFrameLength());
parser.setMaxSettingsKeys(getMaxSettingsKeys());
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
RetainableByteBufferPool retainableByteBufferPool = connector.getByteBufferPool().asRetainableByteBufferPool();
HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector,
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);

View File

@ -57,7 +57,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
{
super(endPoint, executor);
this.buffers = RetainableByteBufferPool.findOrAdapt(null, byteBufferPool);
this.buffers = byteBufferPool.asRetainableByteBufferPool();
this.parser = parser;
parser.init(MessageListener::new);
}

View File

@ -14,10 +14,15 @@
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
@ -29,27 +34,64 @@ import org.eclipse.jetty.util.annotation.ManagedOperation;
@ManagedObject
abstract class AbstractByteBufferPool implements ByteBufferPool
{
public static final int DEFAULT_FACTOR = 4096;
public static final int DEFAULT_MAX_CAPACITY_BY_FACTOR = 16;
private final int _factor;
private final int _maxQueueLength;
private final int _maxCapacity;
private final int _maxBucketSize;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();
private final RetainableByteBufferPool _retainableByteBufferPool;
/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxBucketSize the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
protected AbstractByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
_factor = factor <= 0 ? 1024 : factor;
_maxQueueLength = maxQueueLength;
_maxHeapMemory = (maxHeapMemory != 0) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4;
_maxDirectMemory = (maxDirectMemory != 0) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4;
_factor = factor <= 0 ? DEFAULT_FACTOR : factor;
_maxCapacity = maxCapacity > 0 ? maxCapacity : DEFAULT_MAX_CAPACITY_BY_FACTOR * _factor;
_maxBucketSize = maxBucketSize;
_maxHeapMemory = memorySize(maxHeapMemory);
_maxDirectMemory = memorySize(maxDirectMemory);
_retainableByteBufferPool = (retainedHeapMemory == -2 && retainedDirectMemory == -2)
? RetainableByteBufferPool.from(this)
: newRetainableByteBufferPool(factor, maxCapacity, maxBucketSize, retainedSize(retainedHeapMemory), retainedSize(retainedDirectMemory));
}
static long retainedSize(long size)
{
if (size == -2)
return 0;
return memorySize(size);
}
static long memorySize(long size)
{
if (size < 0)
return -1;
if (size == 0)
return Runtime.getRuntime().maxMemory() / 4;
return size;
}
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return RetainableByteBufferPool.from(this);
}
@Override
public RetainableByteBufferPool asRetainableByteBufferPool()
{
return _retainableByteBufferPool;
}
protected int getCapacityFactor()
@ -57,9 +99,14 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
return _factor;
}
protected int getMaxQueueLength()
protected int getMaxCapacity()
{
return _maxQueueLength;
return _maxCapacity;
}
protected int getMaxBucketSize()
{
return _maxBucketSize;
}
@Deprecated
@ -123,6 +170,97 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
return memory.get();
}
protected static class Bucket
{
private final Queue<ByteBuffer> _queue = new ConcurrentLinkedQueue<>();
private final int _capacity;
private final int _maxSize;
private final AtomicInteger _size;
private final AtomicLong _lastUpdate = new AtomicLong(System.nanoTime());
private final IntConsumer _memoryFunction;
@Deprecated
public Bucket(int capacity, int maxSize)
{
this(capacity, maxSize, i -> {});
}
public Bucket(int capacity, int maxSize, IntConsumer memoryFunction)
{
_capacity = capacity;
_maxSize = maxSize;
_size = maxSize > 0 ? new AtomicInteger() : null;
_memoryFunction = Objects.requireNonNull(memoryFunction);
}
public ByteBuffer acquire()
{
ByteBuffer buffer = _queue.poll();
if (buffer != null)
{
if (_size != null)
_size.decrementAndGet();
_memoryFunction.accept(-buffer.capacity());
}
return buffer;
}
public void release(ByteBuffer buffer)
{
resetUpdateTime();
BufferUtil.reset(buffer);
if (_size == null || _size.incrementAndGet() <= _maxSize)
{
_queue.offer(buffer);
_memoryFunction.accept(buffer.capacity());
}
else
{
_size.decrementAndGet();
}
}
void resetUpdateTime()
{
_lastUpdate.lazySet(System.nanoTime());
}
public void clear()
{
int size = _size == null ? 0 : _size.get() - 1;
while (size >= 0)
{
ByteBuffer buffer = acquire();
if (buffer == null)
break;
if (_size != null)
--size;
}
}
boolean isEmpty()
{
return _queue.isEmpty();
}
int size()
{
return _queue.size();
}
long getLastUpdate()
{
return _lastUpdate.getOpaque();
}
@Override
public String toString()
{
return String.format("%s@%x{capacity=%d, size=%d, maxSize=%d}", getClass().getSimpleName(), hashCode(), _capacity, size(), _maxSize);
}
}
IntConsumer updateMemory(boolean direct)
{
return (direct) ? _directMemory::addAndGet : _heapMemory::addAndGet;

View File

@ -31,9 +31,9 @@ import org.slf4j.LoggerFactory;
/**
* <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.</p>
* <p>Given a capacity {@code factor} of 1024, the first array element holds a queue of ByteBuffers
* each of capacity 1024, the second array element holds a queue of ByteBuffers each of capacity
* 2048, and so on.</p>
* <p>Given a capacity {@code factor} of 4096, the first array element holds a bucket of ByteBuffers
* each of capacity 4096, the second array element holds a bucket of ByteBuffers each of capacity
* 8192, and so on.</p>
* <p>The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
* divided by 4.</p>
*/
@ -44,8 +44,8 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
private final int _maxCapacity;
private final int _minCapacity;
private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect;
private final Bucket[] _direct;
private final Bucket[] _indirect;
private boolean _detailedDump = false;
/**
@ -90,19 +90,35 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxBucketSize the maximum ByteBuffer queue length in a {@link Bucket}
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, maxHeapMemory, maxDirectMemory);
}
/**
* Creates a new ArrayByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxBucketSize the maximum ByteBuffer queue length in a {@link Bucket}
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
super(factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
maxCapacity = getMaxCapacity();
factor = getCapacityFactor();
if (minCapacity <= 0)
minCapacity = 0;
if (maxCapacity <= 0)
maxCapacity = 64 * 1024;
if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
_maxCapacity = maxCapacity;
@ -110,8 +126,8 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
// Initialize all buckets in constructor and never modify the array again.
int length = bucketFor(maxCapacity) + 1;
_direct = new ByteBufferPool.Bucket[length];
_indirect = new ByteBufferPool.Bucket[length];
_direct = new Bucket[length];
_indirect = new Bucket[length];
for (int i = 0; i < length; i++)
{
_direct[i] = newBucket(i, true);
@ -119,11 +135,17 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
}
}
@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new Retained(factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
int capacity = size < _minCapacity ? size : capacityFor(bucketFor(size));
ByteBufferPool.Bucket bucket = bucketFor(size, direct);
Bucket bucket = bucketFor(size, direct);
if (bucket == null)
return newByteBuffer(capacity, direct);
ByteBuffer buffer = bucket.acquire();
@ -152,7 +174,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
return;
boolean direct = buffer.isDirect();
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct);
Bucket bucket = bucketFor(capacity, direct);
if (bucket != null)
{
bucket.release(buffer);
@ -162,7 +184,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
private Bucket newBucket(int key, boolean direct)
{
return new Bucket(capacityFor(key), getMaxQueueLength(), updateMemory(direct));
return new Bucket(capacityFor(key), getMaxBucketSize(), updateMemory(direct));
}
@Override
@ -210,7 +232,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
return bucket * getCapacityFactor();
}
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct)
protected Bucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
@ -242,7 +264,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
}
// Package local for testing
ByteBufferPool.Bucket[] bucketsFor(boolean direct)
Bucket[] bucketsFor(boolean direct)
{
return direct ? _direct : _indirect;
}
@ -276,6 +298,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
dump.add("Indirect Buckets size=" + indirect.size());
dump.add("Direct Buckets size=" + direct.size());
}
dump.add(asRetainableByteBufferPool());
Dumpable.dumpObjects(out, indent, this, dump);
}
@ -286,7 +309,33 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
this.getClass().getSimpleName(), hashCode(),
_minCapacity,
_maxCapacity,
getMaxQueueLength(),
getMaxBucketSize(),
getCapacityFactor());
}
protected class Retained extends ArrayRetainableByteBufferPool
{
public Retained(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
super(0, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
@Override
protected ByteBuffer allocate(int capacity)
{
return ArrayByteBufferPool.this.acquire(capacity, false);
}
@Override
protected ByteBuffer allocateDirect(int capacity)
{
return ArrayByteBufferPool.this.acquire(capacity, true);
}
@Override
protected void removed(RetainableByteBuffer retainedBuffer)
{
ArrayByteBufferPool.this.release(retainedBuffer.getBuffer());
}
}
}

View File

@ -39,13 +39,14 @@ import org.slf4j.LoggerFactory;
* <p>The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
* divided by 4.</p>
*/
@SuppressWarnings("resource")
@ManagedObject
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable
{
private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class);
private final Bucket[] _direct;
private final Bucket[] _indirect;
private final RetainedBucket[] _direct;
private final RetainedBucket[] _indirect;
private final int _minCapacity;
private final int _maxCapacity;
private final long _maxHeapMemory;
@ -109,34 +110,34 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
{
if (minCapacity <= 0)
minCapacity = 0;
factor = factor <= 0 ? AbstractByteBufferPool.DEFAULT_FACTOR : factor;
if (maxCapacity <= 0)
maxCapacity = 64 * 1024;
int f = factor <= 0 ? 1024 : factor;
if ((maxCapacity % f) != 0 || f >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
maxCapacity = AbstractByteBufferPool.DEFAULT_MAX_CAPACITY_BY_FACTOR * factor;
if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
throw new IllegalArgumentException(String.format("The capacity factor(%d) must be a divisor of maxCapacity(%d)", factor, maxCapacity));
int f = factor;
if (bucketIndexFor == null)
bucketIndexFor = c -> (c - 1) / f;
if (bucketCapacity == null)
bucketCapacity = i -> (i + 1) * f;
int length = bucketIndexFor.apply(maxCapacity) + 1;
Bucket[] directArray = new Bucket[length];
Bucket[] indirectArray = new Bucket[length];
RetainedBucket[] directArray = new RetainedBucket[length];
RetainedBucket[] indirectArray = new RetainedBucket[length];
for (int i = 0; i < directArray.length; i++)
{
int capacity = Math.min(bucketCapacity.apply(i), maxCapacity);
directArray[i] = new Bucket(capacity, maxBucketSize);
indirectArray[i] = new Bucket(capacity, maxBucketSize);
directArray[i] = new RetainedBucket(capacity, maxBucketSize);
indirectArray[i] = new RetainedBucket(capacity, maxBucketSize);
}
_minCapacity = minCapacity;
_maxCapacity = maxCapacity;
_direct = directArray;
_indirect = indirectArray;
_maxHeapMemory = (maxHeapMemory != 0L) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4;
_maxDirectMemory = (maxDirectMemory != 0L) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4;
_maxHeapMemory = AbstractByteBufferPool.retainedSize(maxHeapMemory);
_maxDirectMemory = AbstractByteBufferPool.retainedSize(maxDirectMemory);
_bucketIndexFor = bucketIndexFor;
}
@ -155,20 +156,20 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
Bucket bucket = bucketFor(size, direct);
RetainedBucket bucket = bucketFor(size, direct);
if (bucket == null)
return newRetainableByteBuffer(size, direct, byteBuffer -> {});
Bucket.Entry entry = bucket.acquire();
return newRetainableByteBuffer(size, direct, this::removed);
RetainedBucket.Entry entry = bucket.acquire();
RetainableByteBuffer buffer;
if (entry == null)
{
Bucket.Entry reservedEntry = bucket.reserve();
RetainedBucket.Entry reservedEntry = bucket.reserve();
if (reservedEntry != null)
{
buffer = newRetainableByteBuffer(bucket._capacity, direct, byteBuffer ->
buffer = newRetainableByteBuffer(bucket._capacity, direct, retainedBuffer ->
{
BufferUtil.reset(byteBuffer);
BufferUtil.reset(retainedBuffer.getBuffer());
reservedEntry.release();
});
reservedEntry.enable(buffer, true);
@ -180,7 +181,7 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
}
else
{
buffer = newRetainableByteBuffer(size, direct, byteBuffer -> {});
buffer = newRetainableByteBuffer(size, direct, this::removed);
}
}
else
@ -191,9 +192,23 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
return buffer;
}
private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<ByteBuffer> releaser)
protected ByteBuffer allocate(int capacity)
{
ByteBuffer buffer = direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
return ByteBuffer.allocate(capacity);
}
protected ByteBuffer allocateDirect(int capacity)
{
return ByteBuffer.allocateDirect(capacity);
}
protected void removed(RetainableByteBuffer retainedBuffer)
{
}
private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<RetainableByteBuffer> releaser)
{
ByteBuffer buffer = direct ? allocateDirect(capacity) : allocate(capacity);
BufferUtil.clear(buffer);
RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(buffer, releaser);
retainableByteBuffer.acquire();
@ -205,12 +220,12 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
return bucketFor(capacity, direct);
}
private Bucket bucketFor(int capacity, boolean direct)
private RetainedBucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
int idx = _bucketIndexFor.apply(capacity);
Bucket[] buckets = direct ? _direct : _indirect;
RetainedBucket[] buckets = direct ? _direct : _indirect;
if (idx >= buckets.length)
return null;
return buckets[idx];
@ -230,8 +245,8 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private long getByteBufferCount(boolean direct)
{
Bucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(Bucket::size).sum();
RetainedBucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(RetainedBucket::size).sum();
}
@ManagedAttribute("The number of pooled direct ByteBuffers that are available")
@ -248,7 +263,7 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private long getAvailableByteBufferCount(boolean direct)
{
Bucket[] buckets = direct ? _direct : _indirect;
RetainedBucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(bucket -> bucket.values().stream().filter(Pool.Entry::isIdle).count()).sum();
}
@ -286,9 +301,9 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private long getAvailableMemory(boolean direct)
{
Bucket[] buckets = direct ? _direct : _indirect;
RetainedBucket[] buckets = direct ? _direct : _indirect;
long total = 0L;
for (Bucket bucket : buckets)
for (RetainedBucket bucket : buckets)
{
int capacity = bucket._capacity;
total += bucket.values().stream().filter(Pool.Entry::isIdle).count() * capacity;
@ -303,14 +318,17 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
clearArray(_indirect, _currentHeapMemory);
}
private void clearArray(Bucket[] poolArray, AtomicLong memoryCounter)
private void clearArray(RetainedBucket[] poolArray, AtomicLong memoryCounter)
{
for (Bucket pool : poolArray)
for (RetainedBucket pool : poolArray)
{
for (Bucket.Entry entry : pool.values())
for (RetainedBucket.Entry entry : pool.values())
{
entry.remove();
memoryCounter.addAndGet(-entry.getPooled().capacity());
if (entry.remove())
{
memoryCounter.addAndGet(-entry.getPooled().capacity());
removed(entry.getPooled());
}
}
}
}
@ -338,13 +356,13 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
long now = System.nanoTime();
long totalClearedCapacity = 0L;
Bucket[] buckets = direct ? _direct : _indirect;
RetainedBucket[] buckets = direct ? _direct : _indirect;
while (totalClearedCapacity < excess)
{
for (Bucket bucket : buckets)
for (RetainedBucket bucket : buckets)
{
Bucket.Entry oldestEntry = findOldestEntry(now, bucket);
RetainedBucket.Entry oldestEntry = findOldestEntry(now, bucket);
if (oldestEntry == null)
continue;
@ -356,6 +374,7 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
else
_currentHeapMemory.addAndGet(-clearedCapacity);
totalClearedCapacity += clearedCapacity;
removed(oldestEntry.getPooled());
}
// else a concurrent thread evicted the same entry -> do not account for its capacity.
}
@ -389,8 +408,8 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<RetainableByteBuffer> bucket)
{
Bucket.Entry oldestEntry = null;
for (Bucket.Entry entry : bucket.values())
RetainedBucket.Entry oldestEntry = null;
for (RetainedBucket.Entry entry : bucket.values())
{
if (oldestEntry != null)
{
@ -406,11 +425,11 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
return oldestEntry;
}
private static class Bucket extends Pool<RetainableByteBuffer>
private static class RetainedBucket extends Pool<RetainableByteBuffer>
{
private final int _capacity;
Bucket(int capacity, int size)
RetainedBucket(int capacity, int size)
{
super(Pool.StrategyType.THREAD_ID, size, true);
_capacity = capacity;

View File

@ -16,12 +16,6 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntConsumer;
import org.eclipse.jetty.util.BufferUtil;
@ -43,7 +37,7 @@ public interface ByteBufferPool
* @return the requested buffer
* @see #release(ByteBuffer)
*/
public ByteBuffer acquire(int size, boolean direct);
ByteBuffer acquire(int size, boolean direct);
/**
* <p>Returns a {@link ByteBuffer}, usually obtained with {@link #acquire(int, boolean)}
@ -52,7 +46,7 @@ public interface ByteBufferPool
* @param buffer the buffer to return
* @see #acquire(int, boolean)
*/
public void release(ByteBuffer buffer);
void release(ByteBuffer buffer);
/**
* <p>Removes a {@link ByteBuffer} that was previously obtained with {@link #acquire(int, boolean)}.</p>
@ -78,7 +72,14 @@ public interface ByteBufferPool
return direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
}
public static class Lease
/**
* Get this pool as a {@link RetainableByteBufferPool}, which supports reference counting of the
* buffers and possibly a more efficient lookup mechanism based on the {@link org.eclipse.jetty.util.Pool} class.
* @return This pool as a RetainableByteBufferPool. The same instance is always returned by multiple calls to this method.
*/
RetainableByteBufferPool asRetainableByteBufferPool();
class Lease
{
private final ByteBufferPool byteBufferPool;
private final List<ByteBuffer> buffers;
@ -147,95 +148,4 @@ public interface ByteBufferPool
byteBufferPool.release(buffer);
}
}
public static class Bucket
{
private final Queue<ByteBuffer> _queue = new ConcurrentLinkedQueue<>();
private final int _capacity;
private final int _maxSize;
private final AtomicInteger _size;
private final AtomicLong _lastUpdate = new AtomicLong(System.nanoTime());
private final IntConsumer _memoryFunction;
@Deprecated
public Bucket(int capacity, int maxSize)
{
this(capacity, maxSize, i -> {});
}
public Bucket(int capacity, int maxSize, IntConsumer memoryFunction)
{
_capacity = capacity;
_maxSize = maxSize;
_size = maxSize > 0 ? new AtomicInteger() : null;
_memoryFunction = Objects.requireNonNull(memoryFunction);
}
public ByteBuffer acquire()
{
ByteBuffer buffer = _queue.poll();
if (buffer != null)
{
if (_size != null)
_size.decrementAndGet();
_memoryFunction.accept(-buffer.capacity());
}
return buffer;
}
public void release(ByteBuffer buffer)
{
resetUpdateTime();
BufferUtil.reset(buffer);
if (_size == null || _size.incrementAndGet() <= _maxSize)
{
_queue.offer(buffer);
_memoryFunction.accept(buffer.capacity());
}
else
{
_size.decrementAndGet();
}
}
void resetUpdateTime()
{
_lastUpdate.lazySet(System.nanoTime());
}
public void clear()
{
int size = _size == null ? 0 : _size.get() - 1;
while (size >= 0)
{
ByteBuffer buffer = acquire();
if (buffer == null)
break;
if (_size != null)
--size;
}
}
boolean isEmpty()
{
return _queue.isEmpty();
}
int size()
{
return _queue.size();
}
long getLastUpdate()
{
return _lastUpdate.getOpaque();
}
@Override
public String toString()
{
return String.format("%s@%x{capacity=%d, size=%d, maxSize=%d}", getClass().getSimpleName(), hashCode(), _capacity, size(), _maxSize);
}
}
}

View File

@ -58,6 +58,13 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
addBean(delegate);
}
@Override
public RetainableByteBufferPool asRetainableByteBufferPool()
{
// the retainable pool is just a client of the normal pool, so no special handling required.
return delegate.asRetainableByteBufferPool();
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{

View File

@ -65,7 +65,29 @@ public class LogarithmicArrayByteBufferPool extends ArrayByteBufferPool
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, 1, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory);
this(minCapacity, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory, maxHeapMemory, maxDirectMemory);
}
/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
* @param retainedHeapMemory the max heap memory in bytes, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -1 for unlimited retained memory or 0 to use default heuristic
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
super(minCapacity, -1, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
}
@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new LogarithmicRetainablePool(0, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
@Override
@ -106,4 +128,34 @@ public class LogarithmicArrayByteBufferPool extends ArrayByteBufferPool
bucket.resetUpdateTime();
}
}
/**
* A variant of the {@link ArrayRetainableByteBufferPool} that
* uses buckets of buffers that increase in size by a power of
* 2 (eg 1k, 2k, 4k, 8k, etc.).
*/
public static class LogarithmicRetainablePool extends ArrayRetainableByteBufferPool
{
public LogarithmicRetainablePool()
{
this(0, -1, Integer.MAX_VALUE);
}
public LogarithmicRetainablePool(int minCapacity, int maxCapacity, int maxBucketSize)
{
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
}
public LogarithmicRetainablePool(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity,
-1,
maxCapacity,
maxBucketSize,
maxHeapMemory,
maxDirectMemory,
c -> 32 - Integer.numberOfLeadingZeros(c - 1),
i -> 1 << i);
}
}
}

View File

@ -71,43 +71,73 @@ public class MappedByteBufferPool extends AbstractByteBufferPool implements Dump
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxBucketSize the maximum ByteBuffer bucket size
*/
public MappedByteBufferPool(int factor, int maxQueueLength)
public MappedByteBufferPool(int factor, int maxBucketSize)
{
this(factor, maxQueueLength, null);
this(factor, maxBucketSize, null);
}
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxBucketSize the maximum ByteBuffer bucket size
* @param newBucket the function that creates a Bucket
*/
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket)
private MappedByteBufferPool(int factor, int maxBucketSize, Function<Integer, Bucket> newBucket)
{
this(factor, maxQueueLength, newBucket, 0, 0);
this(factor, maxBucketSize, newBucket, 0, 0, 0, 0);
}
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param newBucket the function that creates a Bucket
* @param maxBucketSize the maximum ByteBuffer bucket size
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory)
public MappedByteBufferPool(int factor, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
this(factor, maxBucketSize, null, maxHeapMemory, maxDirectMemory, maxHeapMemory, maxDirectMemory);
}
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxBucketSize the maximum ByteBuffer bucket size
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
public MappedByteBufferPool(int factor, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
this(factor, maxBucketSize, null, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
}
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxBucketSize the maximum ByteBuffer bucket size
* @param newBucket the function that creates a Bucket
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
private MappedByteBufferPool(int factor, int maxBucketSize, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
super(factor, 0, maxBucketSize, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
_newBucket = newBucket;
}
private Bucket newBucket(int key, boolean direct)
{
return (_newBucket != null) ? _newBucket.apply(key) : new Bucket(capacityFor(key), getMaxQueueLength(), updateMemory(direct));
return (_newBucket != null) ? _newBucket.apply(key) : new Bucket(capacityFor(key), getMaxBucketSize(), updateMemory(direct));
}
@Override
@ -271,7 +301,7 @@ public class MappedByteBufferPool extends AbstractByteBufferPool implements Dump
{
return String.format("%s@%x{maxQueueLength=%s, factor=%s}",
this.getClass().getSimpleName(), hashCode(),
getMaxQueueLength(),
getMaxBucketSize(),
getCapacityFactor());
}
}

View File

@ -19,6 +19,8 @@ import org.eclipse.jetty.util.BufferUtil;
public class NullByteBufferPool implements ByteBufferPool
{
private final RetainableByteBufferPool _retainableByteBufferPool = RetainableByteBufferPool.from(this);
@Override
public ByteBuffer acquire(int size, boolean direct)
{
@ -33,4 +35,10 @@ public class NullByteBufferPool implements ByteBufferPool
{
BufferUtil.clear(buffer);
}
@Override
public RetainableByteBufferPool asRetainableByteBufferPool()
{
return _retainableByteBufferPool;
}
}

View File

@ -37,10 +37,10 @@ public class RetainableByteBuffer implements Retainable
{
private final ByteBuffer buffer;
private final AtomicInteger references = new AtomicInteger();
private final Consumer<ByteBuffer> releaser;
private final Consumer<RetainableByteBuffer> releaser;
private final AtomicLong lastUpdate = new AtomicLong(System.nanoTime());
RetainableByteBuffer(ByteBuffer buffer, Consumer<ByteBuffer> releaser)
RetainableByteBuffer(ByteBuffer buffer, Consumer<RetainableByteBuffer> releaser)
{
this.releaser = releaser;
this.buffer = buffer;
@ -112,7 +112,7 @@ public class RetainableByteBuffer implements Retainable
if (ref == 0)
{
lastUpdate.setOpaque(System.nanoTime());
releaser.accept(buffer);
releaser.accept(this);
return true;
}
return false;

View File

@ -15,8 +15,6 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.component.Container;
/**
* <p>A {@link RetainableByteBuffer} pool.</p>
* <p>Acquired buffers <b>must</b> be released by calling {@link RetainableByteBuffer#release()} otherwise the memory they hold will
@ -32,27 +30,36 @@ public interface RetainableByteBufferPool
*/
RetainableByteBuffer acquire(int size, boolean direct);
/**
* Finds a {@link RetainableByteBufferPool} implementation in the given container, or wrap the given
* {@link ByteBufferPool} with an adapter.
* @param container the container to search for an existing memory pool.
* @param byteBufferPool the {@link ByteBufferPool} to wrap if no memory pool was found in the container.
* @return the {@link RetainableByteBufferPool} found or the wrapped one.
*/
static RetainableByteBufferPool findOrAdapt(Container container, ByteBufferPool byteBufferPool)
void clear();
static RetainableByteBufferPool from(ByteBufferPool byteBufferPool)
{
RetainableByteBufferPool retainableByteBufferPool = container == null ? null : container.getBean(RetainableByteBufferPool.class);
if (retainableByteBufferPool == null)
return new RetainableByteBufferPool()
{
// Wrap the ByteBufferPool instance.
retainableByteBufferPool = (size, direct) ->
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
ByteBuffer byteBuffer = byteBufferPool.acquire(size, direct);
RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(byteBuffer, byteBufferPool::release);
RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(byteBuffer, this::release);
retainableByteBuffer.acquire();
return retainableByteBuffer;
};
}
return retainableByteBufferPool;
}
private void release(RetainableByteBuffer retainedBuffer)
{
byteBufferPool.release(retainedBuffer.getBuffer());
}
@Override
public void clear()
{
}
@Override
public String toString()
{
return String.format("NonRetainableByteBufferPool@%x{%s}", hashCode(), byteBufferPool.toString());
}
};
}
}

View File

@ -174,7 +174,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine,
boolean useDirectBuffersForEncryption, boolean useDirectBuffersForDecryption)
{
this(RetainableByteBufferPool.findOrAdapt(null, byteBufferPool), byteBufferPool, executor, endPoint, sslEngine, useDirectBuffersForEncryption, useDirectBuffersForDecryption);
this(byteBufferPool.asRetainableByteBufferPool(), byteBufferPool, executor, endPoint, sslEngine, useDirectBuffersForEncryption, useDirectBuffersForDecryption);
}
public SslConnection(RetainableByteBufferPool retainableByteBufferPool, ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine,

View File

@ -18,7 +18,7 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Objects;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.eclipse.jetty.io.AbstractByteBufferPool.Bucket;
import org.eclipse.jetty.util.StringUtil;
import org.junit.jupiter.api.Test;
@ -37,7 +37,7 @@ public class ArrayByteBufferPoolTest
public void testMinimumRelease()
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size = 1; size <= 9; size++)
{
@ -45,7 +45,7 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertEquals(size, buffer.capacity());
for (ByteBufferPool.Bucket bucket : buckets)
for (Bucket bucket : buckets)
{
if (bucket != null)
assertTrue(bucket.isEmpty());
@ -53,7 +53,7 @@ public class ArrayByteBufferPoolTest
bufferPool.release(buffer);
for (ByteBufferPool.Bucket bucket : buckets)
for (Bucket bucket : buckets)
{
if (bucket != null)
assertTrue(bucket.isEmpty());
@ -68,7 +68,7 @@ public class ArrayByteBufferPoolTest
int factor = 1;
int maxCapacity = 1024;
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(minCapacity, factor, maxCapacity);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size = maxCapacity - 1; size <= maxCapacity + 1; size++)
{
@ -77,7 +77,7 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ByteBufferPool.Bucket bucket : buckets)
for (Bucket bucket : buckets)
{
if (bucket != null)
assertTrue(bucket.isEmpty());
@ -87,7 +87,7 @@ public class ArrayByteBufferPoolTest
int pooled = Arrays.stream(buckets)
.filter(Objects::nonNull)
.mapToInt(Bucket::size)
.mapToInt(AbstractByteBufferPool.Bucket::size)
.sum();
if (size <= maxCapacity)
@ -101,7 +101,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireRelease()
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size = 390; size <= 510; size++)
{
@ -110,7 +110,7 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ByteBufferPool.Bucket bucket : buckets)
for (Bucket bucket : buckets)
{
if (bucket != null)
assertTrue(bucket.isEmpty());
@ -120,7 +120,7 @@ public class ArrayByteBufferPoolTest
int pooled = Arrays.stream(buckets)
.filter(Objects::nonNull)
.mapToInt(Bucket::size)
.mapToInt(AbstractByteBufferPool.Bucket::size)
.sum();
assertEquals(1, pooled);
}
@ -130,7 +130,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireReleaseAcquire()
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size = 390; size <= 510; size++)
{
@ -144,7 +144,7 @@ public class ArrayByteBufferPoolTest
int pooled = Arrays.stream(buckets)
.filter(Objects::nonNull)
.mapToInt(Bucket::size)
.mapToInt(AbstractByteBufferPool.Bucket::size)
.sum();
assertEquals(1, pooled);

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
@ -26,7 +27,9 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -238,7 +241,7 @@ public class ArrayRetainableByteBufferPoolTest
pool.acquire(10, true);
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(2048L));
assertThat(pool.getDirectMemory(), is(2L * AbstractByteBufferPool.DEFAULT_FACTOR));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
@ -282,16 +285,16 @@ public class ArrayRetainableByteBufferPoolTest
{
RetainableByteBuffer buf1 = pool.acquire(10, true);
assertThat(buf1, is(notNullValue()));
assertThat(buf1.capacity(), is(1024));
assertThat(buf1.capacity(), is(AbstractByteBufferPool.DEFAULT_FACTOR));
RetainableByteBuffer buf2 = pool.acquire(10, true);
assertThat(buf2, is(notNullValue()));
assertThat(buf2.capacity(), is(1024));
assertThat(buf2.capacity(), is(AbstractByteBufferPool.DEFAULT_FACTOR));
buf1.release();
buf2.release();
RetainableByteBuffer buf3 = pool.acquire(16384 + 1, true);
assertThat(buf3, is(notNullValue()));
assertThat(buf3.capacity(), is(16384 + 1024));
assertThat(buf3.capacity(), is(16384 + AbstractByteBufferPool.DEFAULT_FACTOR));
buf3.release();
RetainableByteBuffer buf4 = pool.acquire(32768, true);
@ -307,7 +310,7 @@ public class ArrayRetainableByteBufferPoolTest
assertThat(pool.getDirectByteBufferCount(), is(4L));
assertThat(pool.getHeapByteBufferCount(), is(1L));
assertThat(pool.getDirectMemory(), is(1024 + 1024 + 16384 + 1024 + 32768L));
assertThat(pool.getDirectMemory(), is(AbstractByteBufferPool.DEFAULT_FACTOR * 3L + 16384 + 32768L));
assertThat(pool.getHeapMemory(), is(32768L));
pool.clear();
@ -391,4 +394,29 @@ public class ArrayRetainableByteBufferPoolTest
assertThat(buffer.release(), is(true));
assertThat(buffer.getBuffer().order(), Matchers.is(ByteOrder.BIG_ENDIAN));
}
@Test
public void testLogarithmic()
{
LogarithmicArrayByteBufferPool pool = new LogarithmicArrayByteBufferPool();
ByteBuffer buffer5 = pool.acquire(5, false);
pool.release(buffer5);
ByteBuffer buffer6 = pool.acquire(6, false);
assertThat(buffer6, sameInstance(buffer5));
pool.release(buffer6);
ByteBuffer buffer9 = pool.acquire(9, false);
assertThat(buffer9, not(sameInstance(buffer5)));
pool.release(buffer9);
RetainableByteBufferPool retainablePool = pool.asRetainableByteBufferPool();
RetainableByteBuffer retain5 = retainablePool.acquire(5, false);
retain5.release();
RetainableByteBuffer retain6 = retainablePool.acquire(6, false);
assertThat(retain6, sameInstance(retain5));
retain6.release();
RetainableByteBuffer retain9 = retainablePool.acquire(9, false);
assertThat(retain9, not(sameInstance(retain5)));
retain9.release();
}
}

View File

@ -16,7 +16,7 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.eclipse.jetty.io.AbstractByteBufferPool.Bucket;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.junit.jupiter.api.Test;
@ -138,7 +138,7 @@ public class MappedByteBufferPoolTest
{
int factor = 1024;
int maxMemory = 11 * factor;
MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, null, -1, maxMemory);
MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, -1, maxMemory);
ConcurrentMap<Integer, Bucket> buckets = bufferPool.bucketsFor(true);
// Create the buckets - the oldest is the larger.

View File

@ -4,8 +4,10 @@
<New id="byteBufferPool" class="org.eclipse.jetty.io.LogarithmicArrayByteBufferPool">
<Arg type="int"><Property name="jetty.byteBufferPool.minCapacity" default="0"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxCapacity" default="65536"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxBucketSize" deprecated="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.retainedHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.retainedDirectMemory" default="0"/></Arg>
</New>
</Configure>

View File

@ -3,10 +3,12 @@
<Configure>
<New id="byteBufferPool" class="org.eclipse.jetty.io.ArrayByteBufferPool">
<Arg type="int"><Property name="jetty.byteBufferPool.minCapacity" default="0"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.factor" default="1024"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.factor" default="4096"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxCapacity" default="65536"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="-1"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxBucketSize" deprecated="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.retainedHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.retainedDirectMemory" default="0"/></Arg>
</New>
</Configure>

View File

@ -20,11 +20,17 @@ etc/jetty-bytebufferpool-logarithmic.xml
## Maximum capacity to pool ByteBuffers
#jetty.byteBufferPool.maxCapacity=65536
## Maximum queue length for each bucket (-1 for unbounded)
#jetty.byteBufferPool.maxQueueLength=-1
## Maximum size for each bucket (-1 for unbounded)
#jetty.byteBufferPool.maxBucketSize=-1
## Maximum heap memory retainable by the pool (0 for heuristic, -1 for unlimited)
## Maximum heap memory held idle by the pool (0 for heuristic, -1 for unlimited).
#jetty.byteBufferPool.maxHeapMemory=0
## Maximum direct memory retainable by the pool (0 for heuristic, -1 for unlimited)
## Maximum direct memory held idle by the pool (0 for heuristic, -1 for unlimited).
#jetty.byteBufferPool.maxDirectMemory=0
## Maximum heap memory retained whilst in use by the pool (0 for heuristic, -1 for unlimited, -2 for no retained).
#jetty.byteBufferPool.retainedHeapMemory=0
## Maximum direct memory retained whilst in use by the pool (0 for heuristic, -1 for unlimited, -2 for no retained).
#jetty.byteBufferPool.retainedDirectMemory=0

View File

@ -1,5 +1,6 @@
[description]
Configures the ByteBufferPool used by ServerConnectors.
Use module "bytebufferpool-logarithmic" for a pool may hold less granulated sized buffers.
[depends]
logging
@ -19,13 +20,19 @@ etc/jetty-bytebufferpool.xml
## Bucket capacity factor.
## ByteBuffers are allocated out of buckets that have
## a capacity that is multiple of this factor.
#jetty.byteBufferPool.factor=1024
#jetty.byteBufferPool.factor=4096
## Maximum queue length for each bucket (-1 for unbounded).
#jetty.byteBufferPool.maxQueueLength=-1
## Maximum size for each bucket (-1 for unbounded).
#jetty.byteBufferPool.maxBucketSize=-1
## Maximum heap memory retainable by the pool (-1 for unlimited).
#jetty.byteBufferPool.maxHeapMemory=-1
## Maximum heap memory held idle by the pool (0 for heuristic, -1 for unlimited).
#jetty.byteBufferPool.maxHeapMemory=0
## Maximum direct memory retainable by the pool (-1 for unlimited).
#jetty.byteBufferPool.maxDirectMemory=-1
## Maximum direct memory held idle by the pool (0 for heuristic, -1 for unlimited).
#jetty.byteBufferPool.maxDirectMemory=0
## Maximum heap memory retained whilst in use by the pool (0 for heuristic, -1 for unlimited, -2 for no retained).
#jetty.byteBufferPool.retainedHeapMemory=0
## Maximum direct memory retained whilst in use by the pool (0 for heuristic, -1 for unlimited, -2 for no retained).
#jetty.byteBufferPool.retainedDirectMemory=0

View File

@ -32,10 +32,9 @@ import java.util.concurrent.locks.Condition;
import java.util.stream.Collectors;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.LogarithmicArrayByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.internal.HttpConnection;
import org.eclipse.jetty.util.ProcessorUtils;
@ -185,12 +184,26 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
scheduler = _server.getBean(Scheduler.class);
_scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false);
addBean(_scheduler);
if (pool == null)
pool = _server.getBean(ByteBufferPool.class);
_byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
addBean(_byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = _server.getBean(RetainableByteBufferPool.class);
addBean(retainableByteBufferPool == null ? new ArrayRetainableByteBufferPool.ExponentialPool() : retainableByteBufferPool, retainableByteBufferPool == null);
synchronized (server)
{
if (pool == null)
{
// Look for (and cache) a common pool on the server
pool = server.getBean(ByteBufferPool.class);
if (pool == null)
{
pool = new LogarithmicArrayByteBufferPool();
server.addBean(pool, true);
}
addBean(pool, false);
}
else
{
addBean(pool, true);
}
}
_byteBufferPool = pool;
for (ConnectionFactory factory : factories)
{

View File

@ -168,7 +168,7 @@ public class SslConnectionFactory extends AbstractConnectionFactory implements C
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption());
}

View File

@ -19,6 +19,7 @@ import java.util.Arrays;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.LocalConnector;
@ -44,6 +45,7 @@ public class BufferedResponseHandlerTest
public void setUp() throws Exception
{
_server = new Server();
_server.addBean(new NullByteBufferPool()); // Avoid giving larger buffers than requested
HttpConfiguration config = new HttpConfiguration();
config.setOutputBufferSize(1024);
config.setOutputAggregationSize(256);

View File

@ -29,6 +29,7 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
@ -183,7 +184,7 @@ public class Main
public void invokeMain(ClassLoader classloader, StartArgs args) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException, IOException
{
if (args.getEnabledModules().isEmpty())
if (args.getSelectedModules().isEmpty())
{
if (Files.exists(getBaseHome().getBasePath("start.jar")))
StartLog.error("Do not start with ${jetty.base} == ${jetty.home}!");
@ -334,12 +335,22 @@ public class Main
base.source);
// 4) Active Module Resolution
for (String enabledModule : modules.getSortedNames(args.getEnabledModules()))
List<String> selectedModules = args.getSelectedModules();
List<String> sortedSelectedModules = modules.getSortedNames(selectedModules);
List<String> unknownModules = new ArrayList<>(selectedModules);
unknownModules.removeAll(sortedSelectedModules);
if (unknownModules.size() >= 1)
{
for (String source : args.getSources(enabledModule))
throw new UsageException(UsageException.ERR_UNKNOWN, "Unknown module%s=[%s] List available with --list-modules",
unknownModules.size() > 1 ? 's' : "",
String.join(", ", unknownModules));
}
for (String selectedModule : sortedSelectedModules)
{
for (String source : args.getSources(selectedModule))
{
String shortForm = baseHome.toShortForm(source);
modules.enable(enabledModule, shortForm);
modules.enable(selectedModule, shortForm);
}
}
@ -470,8 +481,7 @@ public class Main
CommandLineBuilder cmd = args.getMainArgs(StartArgs.ALL_PARTS);
cmd.debug();
List<String> execModules = args.getEnabledModules().stream()
.map(name -> args.getAllModules().get(name))
List<String> execModules = args.getAllModules().getEnabled().stream()
// Keep only the forking modules.
.filter(module -> !module.getJvmArgs().isEmpty())
.map(Module::getName)

View File

@ -390,7 +390,10 @@ public class Modules implements Iterable<Module>
public List<String> getSortedNames(Set<String> enabledModules)
{
return getSortedAll().stream().map(Module::getName).filter(enabledModules::contains).collect(Collectors.toList());
return getSortedAll().stream()
.map(Module::getName)
.filter(enabledModules::contains)
.collect(Collectors.toList());
}
/**

View File

@ -462,7 +462,29 @@ public class StartArgs
return allModules;
}
public Set<String> getEnabledModules()
/**
* @deprecated use {@link #getSelectedModules()} instead
*/
@Deprecated
public List<String> getEnabledModules()
{
return getSelectedModules();
}
/**
* <p>
* The list of selected Modules to enable based on configuration
* obtained from {@code start.d/*.ini}, {@code start.ini}, and command line.
* </p>
*
* <p>
* For full list of enabled modules, use {@link Modules#getEnabled()}
* </p>
*
* @return the list of selected modules (by name) that the configuration has.
* @see Modules#getEnabled()
*/
public List<String> getSelectedModules()
{
return this.modules;
}
@ -1176,11 +1198,11 @@ public class StartArgs
return environment;
}
// Enable a module
// Select a module to eventually be enabled
if (arg.startsWith("--module="))
{
List<String> moduleNames = Props.getValues(arg);
enableModules(source, moduleNames);
selectModules(source, moduleNames);
Module module = getAllModules().get(moduleNames.get(moduleNames.size() - 1));
String envName = module.getEnvironment();
return envName == null ? coreEnvironment : getEnvironment(envName);
@ -1334,7 +1356,7 @@ public class StartArgs
setProperty(environment, key, value, source);
}
private void enableModules(String source, List<String> moduleNames)
private void selectModules(String source, List<String> moduleNames)
{
for (String moduleName : moduleNames)
{

View File

@ -71,7 +71,7 @@ public class MavenMetadataTest
@Test
public void testIsExpiredTimestampNow()
{
LocalDateTime now = LocalDateTime.now();
LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
String timestamp = getTimestampFormatter().format(now);
assertFalse(MavenMetadata.isExpiredTimestamp(timestamp), "Timestamp should NOT be stale: " + timestamp);
}

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import org.eclipse.jetty.start.Props;
import org.eclipse.jetty.start.UsageException;
import org.eclipse.jetty.toolchain.test.FS;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.PathAssert;
@ -34,10 +35,12 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class BasicTest extends AbstractUseCase
{
@ -110,6 +113,59 @@ public class BasicTest extends AbstractUseCase
assertThat("System.getProperty(jetty.base)", System.getProperty("jetty.base"), is(not(startsWith("file:"))));
}
@Test
public void testAddModuleDoesNotExist() throws Exception
{
setupDistHome();
Files.write(baseDir.resolve("start.ini"),
List.of(
"--module=main",
"--module=does-not-exist"
),
StandardCharsets.UTF_8);
// === Execute Main
List<String> runArgs = new ArrayList<>();
runArgs.add("--create-files");
UsageException usage = assertThrows(UsageException.class, () ->
{
ExecResults results = exec(runArgs, true);
if (results.exception != null)
{
throw results.exception;
}
});
assertThat(usage.getMessage(), containsString("Unknown module=[does-not-exist]"));
}
@Test
public void testAddModuleDoesNotExistMultiple() throws Exception
{
setupDistHome();
Files.write(baseDir.resolve("start.ini"),
List.of(
"--module=main",
"--module=does-not-exist",
"--module=also-not-present"
),
StandardCharsets.UTF_8);
// === Execute Main
List<String> runArgs = new ArrayList<>();
runArgs.add("--create-files");
UsageException usage = assertThrows(UsageException.class, () ->
{
ExecResults results = exec(runArgs, true);
if (results.exception != null)
{
throw results.exception;
}
});
assertThat(usage.getMessage(), containsString("Unknown modules=[does-not-exist, also-not-present]"));
}
@Test
public void testProvidersUsingDefault() throws Exception
{

View File

@ -339,7 +339,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
return null;
// If we have no space
if (entries.size() >= maxEntries)
if (maxEntries > 0 && entries.size() >= maxEntries)
return null;
Entry entry = newEntry();

View File

@ -440,7 +440,7 @@ public abstract class CoreClientUpgradeRequest extends HttpRequest implements Re
HttpClient httpClient = wsClient.getHttpClient();
ByteBufferPool bufferPool = wsClient.getWebSocketComponents().getBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(wsClient.getWebSocketComponents(), bufferPool);
RetainableByteBufferPool retainableByteBufferPool = bufferPool.asRetainableByteBufferPool();
WebSocketConnection wsConnection = new WebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), bufferPool, retainableByteBufferPool, coreSession);
wsClient.getEventListeners().forEach(wsConnection::addEventListener);
coreSession.setWebSocketConnection(wsConnection);

View File

@ -82,7 +82,7 @@ public final class RFC6455Handshaker extends AbstractHandshaker
ConnectionMetaData connectionMetaData = baseRequest.getConnectionMetaData();
Connector connector = connectionMetaData.getConnector();
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
return newWebSocketConnection(connectionMetaData.getConnection().getEndPoint(), connector.getExecutor(), connector.getScheduler(), byteBufferPool, retainableByteBufferPool, coreSession);
}

View File

@ -64,7 +64,7 @@ public class RFC8441Handshaker extends AbstractHandshaker
Connector connector = connectionMetaData.getConnector();
EndPoint endPoint = null; // TODO: connectionMetaData.getTunnellingEndPoint();
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
return newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), byteBufferPool, retainableByteBufferPool, coreSession);
}

View File

@ -18,6 +18,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileStore;
@ -189,6 +190,7 @@ public class HugeResourceTest
context.setBaseResource(staticBase);
context.addServlet(PostServlet.class, "/post");
context.addServlet(ChunkedServlet.class, "/chunked/*");
String location = multipartTempDir.toString();
long maxFileSize = Long.MAX_VALUE;
@ -224,7 +226,7 @@ public class HugeResourceTest
@ParameterizedTest
@MethodSource("staticFiles")
public void testDownload(String filename, long expectedSize) throws Exception
public void testDownloadStatic(String filename, long expectedSize) throws Exception
{
URI destUri = server.getURI().resolve("/" + filename);
InputStreamResponseListener responseListener = new InputStreamResponseListener();
@ -251,7 +253,33 @@ public class HugeResourceTest
@ParameterizedTest
@MethodSource("staticFiles")
public void testHead(String filename, long expectedSize) throws Exception
public void testDownloadChunked(String filename, long expectedSize) throws Exception
{
URI destUri = server.getURI().resolve("/chunked/" + filename);
InputStreamResponseListener responseListener = new InputStreamResponseListener();
Request request = client.newRequest(destUri)
.method(HttpMethod.GET);
request.send(responseListener);
Response response = responseListener.get(5, TimeUnit.SECONDS);
assertThat("HTTP Response Code", response.getStatus(), is(200));
// dumpResponse(response);
String transferEncoding = response.getHeaders().get(HttpHeader.TRANSFER_ENCODING);
assertThat("Http Response Header: \"Transfer-Encoding\"", transferEncoding, is("chunked"));
try (ByteCountingOutputStream out = new ByteCountingOutputStream();
InputStream in = responseListener.getInputStream())
{
IO.copy(in, out);
assertThat("Downloaded Files Size: " + filename, out.getCount(), is(expectedSize));
}
}
@ParameterizedTest
@MethodSource("staticFiles")
public void testHeadStatic(String filename, long expectedSize) throws Exception
{
URI destUri = server.getURI().resolve("/" + filename);
InputStreamResponseListener responseListener = new InputStreamResponseListener();
@ -274,6 +302,30 @@ public class HugeResourceTest
assertThat("Http Response Header: \"Content-Length: " + contentLength + "\"", contentLengthLong, is(expectedSize));
}
@ParameterizedTest
@MethodSource("staticFiles")
public void testHeadChunked(String filename, long expectedSize) throws Exception
{
URI destUri = server.getURI().resolve("/chunked/" + filename);
InputStreamResponseListener responseListener = new InputStreamResponseListener();
Request request = client.newRequest(destUri)
.method(HttpMethod.HEAD);
request.send(responseListener);
Response response = responseListener.get(5, TimeUnit.SECONDS);
try (InputStream in = responseListener.getInputStream())
{
assertThat(in.read(), is(-1));
}
assertThat("HTTP Response Code", response.getStatus(), is(200));
// dumpResponse(response);
String transferEncoding = response.getHeaders().get(HttpHeader.TRANSFER_ENCODING);
assertThat("Http Response Header: \"Transfer-Encoding\"", transferEncoding, is("chunked"));
}
@ParameterizedTest
@MethodSource("staticFiles")
public void testUpload(String filename, long expectedSize) throws Exception
@ -360,6 +412,22 @@ public class HugeResourceTest
}
}
public static class ChunkedServlet extends HttpServlet
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException
{
URL resource = req.getServletContext().getResource(req.getPathInfo());
OutputStream output = resp.getOutputStream();
try (InputStream input = resource.openStream())
{
resp.setContentType("application/octet-stream");
resp.flushBuffer();
IO.copy(input, output);
}
}
}
public static class MultipartServlet extends HttpServlet
{
@Override

View File

@ -227,46 +227,39 @@ public class AsyncCompletionTest extends HttpServerTestFixture
os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.flush();
// wait for OWP to execute (proves we do not block in write APIs)
boolean completeCalled = handler.waitForOWPExit();
while (true)
{
// wait for threads to return to base level (proves we are really async)
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
Boolean owpExit = handler.pollForOWPExit();
if (owpExit == null)
{
// handle any callback written so far
while (delay != null)
{
delay.proceed();
delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
}
continue;
}
// OWP has exited, but we have a delay, so let's wait for thread to return to the pool to ensure we are async.
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (_threadPool.getBusyThreads() != base)
while (delay != null && _threadPool.getBusyThreads() > base)
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(POLL);
}
if (completeCalled)
break;
// We are now asynchronously waiting!
assertThat(__transportComplete.get(), is(false));
// If we are not complete, we must be waiting for one or more writes to complete
while (true)
// handle any callback written so far
while (delay != null)
{
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
{
delay.proceed();
continue;
}
// No delay callback found, have we finished OWP again?
Boolean c = handler.pollForOWPExit();
if (c == null)
// No we haven't, so look for another delay callback
continue;
// We have a OWP result, so let's handle it.
completeCalled = c;
break;
delay.proceed();
delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
}
if (owpExit)
break;
}
// Wait for full completion

View File

@ -31,9 +31,9 @@ import jakarta.servlet.ServletException;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.server.LocalConnector.LocalEndPoint;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.BufferUtil;
@ -71,19 +71,7 @@ public class HttpOutputTest
_server = new Server();
_contextHandler = new ContextHandler(_server, "/");
_server.addBean(new ByteBufferPool()
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
return direct ? BufferUtil.allocateDirect(size) : BufferUtil.allocate(size);
}
@Override
public void release(ByteBuffer buffer)
{
}
});
_server.addBean(new NullByteBufferPool());
HttpConnectionFactory http = new HttpConnectionFactory();
http.getHttpConfiguration().setRequestHeaderSize(1024);

View File

@ -12,7 +12,7 @@
<packaging>pom</packaging>
<properties>
<osgi-version>3.17.100</osgi-version>
<osgi-version>3.18.0</osgi-version>
<osgi-services-version>3.10.200</osgi-services-version>
<osgi-util-version>3.6.100</osgi-util-version>
<equinox-http-servlet-version>1.0.0-v20070606</equinox-http-servlet-version>

View File

@ -13,7 +13,7 @@
<name>Jetty :: GCloud</name>
<properties>
<gcloud.version>2.7.0</gcloud.version>
<gcloud.version>2.10.0</gcloud.version>
</properties>
<modules>

View File

@ -47,5 +47,25 @@
<version>${project.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-alpn</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-boot</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-boot-jsp</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-boot-warurl</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-httpservice</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,917 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.http.HttpStatus.INTERNAL_SERVER_ERROR_500;
/**
* <p>A {@link Connection} that handles the HTTP protocol.</p>
*/
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, WriteFlusher.Listener, Connection.UpgradeFrom, Connection.UpgradeTo
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
private final HttpConfiguration _config;
private final Connector _connector;
private final ByteBufferPool _bufferPool;
private final RetainableByteBufferPool _retainableByteBufferPool;
private final HttpInput _input;
private final HttpGenerator _generator;
private final HttpChannelOverHttp _channel;
private final HttpParser _parser;
private volatile RetainableByteBuffer _retainableByteBuffer;
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback();
private final boolean _recordHttpComplianceViolations;
private final LongAdder bytesIn = new LongAdder();
private final LongAdder bytesOut = new LongAdder();
private boolean _useInputDirectByteBuffers;
private boolean _useOutputDirectByteBuffers;
/**
* Get the current connection that this thread is dispatched to.
* Note that a thread may be processing a request asynchronously and
* thus not be dispatched to the connection.
*
* @return the current HttpConnection or null
* @see Request#getAttribute(String) for a more general way to access the HttpConnection
*/
public static HttpConnection getCurrentConnection()
{
return __currentConnection.get();
}
protected static HttpConnection setCurrentConnection(HttpConnection connection)
{
HttpConnection last = __currentConnection.get();
__currentConnection.set(connection);
return last;
}
public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint, boolean recordComplianceViolations)
{
super(endPoint, connector.getExecutor());
_config = config;
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_retainableByteBufferPool = _bufferPool.asRetainableByteBufferPool();
_generator = newHttpGenerator();
_channel = newHttpChannel();
_input = _channel.getRequest().getHttpInput();
_parser = newHttpParser(config.getHttpCompliance());
_recordHttpComplianceViolations = recordComplianceViolations;
if (LOG.isDebugEnabled())
LOG.debug("New HTTP Connection {}", this);
}
public HttpConfiguration getHttpConfiguration()
{
return _config;
}
public boolean isRecordHttpComplianceViolations()
{
return _recordHttpComplianceViolations;
}
protected HttpGenerator newHttpGenerator()
{
return new HttpGenerator(_config.getSendServerVersion(), _config.getSendXPoweredBy());
}
protected HttpChannelOverHttp newHttpChannel()
{
return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
}
protected HttpParser newHttpParser(HttpCompliance compliance)
{
HttpParser parser = new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize(), compliance);
parser.setHeaderCacheSize(getHttpConfiguration().getHeaderCacheSize());
parser.setHeaderCacheCaseSensitive(getHttpConfiguration().isHeaderCacheCaseSensitive());
return parser;
}
protected HttpParser.RequestHandler newRequestHandler()
{
return _channel;
}
public Server getServer()
{
return _connector.getServer();
}
public Connector getConnector()
{
return _connector;
}
public HttpChannel getHttpChannel()
{
return _channel;
}
public HttpParser getParser()
{
return _parser;
}
public HttpGenerator getGenerator()
{
return _generator;
}
@Override
public long getMessagesIn()
{
return getHttpChannel().getRequests();
}
@Override
public long getMessagesOut()
{
return getHttpChannel().getRequests();
}
public boolean isUseInputDirectByteBuffers()
{
return _useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
_useInputDirectByteBuffers = useInputDirectByteBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return _useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
_useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
@Override
public ByteBuffer onUpgradeFrom()
{
if (!isRequestBufferEmpty())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining());
unconsumed.put(_retainableByteBuffer.getBuffer());
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
}
return null;
}
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
BufferUtil.append(getRequestBuffer(), buffer);
}
@Override
public void onFlushed(long bytes) throws IOException
{
// Unfortunately cannot distinguish between header and content
// bytes, and for content bytes whether they are chunked or not.
_channel.getResponse().getHttpOutput().onFlushed(bytes);
}
void releaseRequestBuffer()
{
if (_retainableByteBuffer != null && !_retainableByteBuffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("releaseRequestBuffer {}", this);
if (_retainableByteBuffer.release())
_retainableByteBuffer = null;
else
throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer);
}
}
private ByteBuffer getRequestBuffer()
{
if (_retainableByteBuffer == null)
_retainableByteBuffer = _retainableByteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
return _retainableByteBuffer.getBuffer();
}
public boolean isRequestBufferEmpty()
{
return _retainableByteBuffer == null || _retainableByteBuffer.isEmpty();
}
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable enter {} {}", this, _channel.getState(), _retainableByteBuffer);
HttpConnection last = setCurrentConnection(this);
try
{
while (getEndPoint().isOpen())
{
// Fill the request buffer (if needed).
int filled = fillRequestBuffer();
if (filled < 0 && getEndPoint().isOutputShutdown())
close();
// Parse the request buffer.
boolean handle = parseRequestBuffer();
// There could be a connection upgrade before handling
// the HTTP/1.1 request, for example PRI * HTTP/2.
// If there was a connection upgrade, the other
// connection took over, nothing more to do here.
if (getEndPoint().getConnection() != this)
break;
// Handle channel event
if (handle)
{
boolean suspended = !_channel.handle();
// We should break iteration if we have suspended or upgraded the connection.
if (suspended || getEndPoint().getConnection() != this)
break;
}
else if (filled == 0)
{
fillInterested();
break;
}
else if (filled < 0)
{
if (_channel.getState().isIdle())
getEndPoint().shutdownOutput();
break;
}
}
}
catch (Throwable x)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("{} caught exception {}", this, _channel.getState(), x);
if (_retainableByteBuffer != null)
{
_retainableByteBuffer.clear();
releaseRequestBuffer();
}
}
finally
{
getEndPoint().close(x);
}
}
finally
{
setCurrentConnection(last);
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable exit {} {}", this, _channel.getState(), _retainableByteBuffer);
}
}
/**
* Parse and fill data, looking for content.
* We do parse first, and only fill if we're out of bytes to avoid unnecessary system calls.
*/
void parseAndFillForContent()
{
// Defensive check to avoid an infinite select/wakeup/fillAndParseForContent/wait loop
// in case the parser was mistakenly closed and the connection was not aborted.
if (_parser.isTerminated())
throw new IllegalStateException("Parser is terminated: " + _parser);
// When fillRequestBuffer() is called, it must always be followed by a parseRequestBuffer() call otherwise this method
// doesn't trigger EOF/earlyEOF which breaks AsyncRequestReadTest.testPartialReadThenShutdown().
// This loop was designed by a committee and voted by a majority.
while (_parser.inContentState())
{
if (parseRequestBuffer())
break;
// Re-check the parser state after parsing to avoid filling,
// otherwise fillRequestBuffer() would acquire a ByteBuffer
// that may be leaked.
if (_parser.inContentState() && fillRequestBuffer() <= 0)
break;
}
}
private int fillRequestBuffer()
{
if (_retainableByteBuffer != null && _retainableByteBuffer.isRetained())
throw new IllegalStateException("fill with unconsumed content on " + this);
if (isRequestBufferEmpty())
{
// Get a buffer
// We are not in a race here for the request buffer as we have not yet received a request,
// so there are not an possible legal threads calling #parseContent or #completed.
ByteBuffer requestBuffer = getRequestBuffer();
// fill
try
{
int filled = getEndPoint().fill(requestBuffer);
if (filled == 0) // Do a retry on fill 0 (optimization for SSL connections)
filled = getEndPoint().fill(requestBuffer);
if (filled > 0)
bytesIn.add(filled);
else if (filled < 0)
_parser.atEOF();
if (LOG.isDebugEnabled())
LOG.debug("{} filled {} {}", this, filled, _retainableByteBuffer);
return filled;
}
catch (IOException e)
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to fill from endpoint {}", getEndPoint(), e);
_parser.atEOF();
return -1;
}
}
return 0;
}
private boolean parseRequestBuffer()
{
if (LOG.isDebugEnabled())
LOG.debug("{} parse {}", this, _retainableByteBuffer);
boolean handle = _parser.parseNext(_retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : _retainableByteBuffer.getBuffer());
if (LOG.isDebugEnabled())
LOG.debug("{} parsed {} {}", this, handle, _parser);
// recycle buffer ?
if (_retainableByteBuffer != null && !_retainableByteBuffer.isRetained())
releaseRequestBuffer();
return handle;
}
private boolean upgrade()
{
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection == null)
return false;
if (LOG.isDebugEnabled())
LOG.debug("Upgrade from {} to {}", this, connection);
_channel.getState().upgrade();
getEndPoint().upgrade(connection);
_channel.recycle();
_parser.reset();
_generator.reset();
if (_retainableByteBuffer != null)
{
if (!_retainableByteBuffer.isRetained())
{
releaseRequestBuffer();
}
else
{
LOG.warn("{} lingering content references?!?!", this);
_retainableByteBuffer = null; // Not returned to pool!
}
}
return true;
}
@Override
public void onCompleted()
{
// If we are fill interested, then a read is pending and we must abort
if (isFillInterested())
{
LOG.warn("Pending read in onCompleted {} {}", this, getEndPoint());
_channel.abort(new IOException("Pending read in onCompleted"));
}
else
{
// Handle connection upgrades.
if (upgrade())
return;
}
// Drive to EOF, EarlyEOF or Error
boolean complete = _input.consumeAll();
// Finish consuming the request
// If we are still expecting
if (_channel.isExpecting100Continue())
{
// close to seek EOF
_parser.close();
}
// else abort if we can't consume all
else if (_generator.isPersistent() && !complete)
{
if (LOG.isDebugEnabled())
LOG.debug("unconsumed input {} {}", this, _parser);
_channel.abort(new IOException("unconsumed input"));
}
// Reset the channel, parsers and generator
_channel.recycle();
if (!_parser.isClosed())
{
if (_generator.isPersistent())
_parser.reset();
else
_parser.close();
}
_generator.reset();
// if we are not called from the onfillable thread, schedule completion
if (getCurrentConnection() != this)
{
// If we are looking for the next request
if (_parser.isStart())
{
// if the buffer is empty
if (isRequestBufferEmpty())
{
// look for more data
fillInterested();
}
// else if we are still running
else if (getConnector().isRunning())
{
// Dispatched to handle a pipelined request
try
{
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
if (getConnector().isRunning())
LOG.warn("Failed dispatch of {}", this, e);
else
LOG.trace("IGNORED", e);
getEndPoint().close();
}
}
else
{
getEndPoint().close();
}
}
// else the parser must be closed, so seek the EOF if we are still open
else if (getEndPoint().isOpen())
fillInterested();
}
}
@Override
protected boolean onReadTimeout(Throwable timeout)
{
return _channel.onIdleTimeout(timeout);
}
@Override
protected void onFillInterestedFailed(Throwable cause)
{
_parser.close();
super.onFillInterestedFailed(cause);
}
@Override
public void onOpen()
{
super.onOpen();
if (isRequestBufferEmpty())
fillInterested();
else
getExecutor().execute(this);
}
@Override
public void onClose(Throwable cause)
{
if (cause == null)
_sendCallback.close();
else
_sendCallback.failed(cause);
super.onClose(cause);
}
@Override
public void run()
{
onFillable();
}
@Override
public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback)
{
if (response == null)
{
if (!lastContent && BufferUtil.isEmpty(content))
{
callback.succeeded();
return;
}
}
else
{
// If we are still expecting a 100 continues when we commit
if (_channel.isExpecting100Continue())
// then we can't be persistent
_generator.setPersistent(false);
}
if (_sendCallback.reset(request, response, content, lastContent, callback))
{
_sendCallback.iterate();
}
}
HttpInput.Content newContent(ByteBuffer c)
{
return new Content(c);
}
@Override
public void abort(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("abort {} {}", this, failure);
// Do a direct close of the output, as this may indicate to a client that the
// response is bad either with RST or by abnormal completion of chunked response.
getEndPoint().close();
}
@Override
public boolean isPushSupported()
{
return false;
}
@Override
public void push(org.eclipse.jetty.http.MetaData.Request request)
{
LOG.debug("ignore push in {}", this);
}
public void asyncReadFillInterested()
{
getEndPoint().tryFillInterested(_asyncReadCallback);
}
@Override
public long getBytesIn()
{
return bytesIn.longValue();
}
@Override
public long getBytesOut()
{
return bytesOut.longValue();
}
@Override
public String toConnectionString()
{
return String.format("%s@%x[p=%s,g=%s]=>%s",
getClass().getSimpleName(),
hashCode(),
_parser,
_generator,
_channel);
}
private class Content extends HttpInput.Content
{
public Content(ByteBuffer content)
{
super(content);
_retainableByteBuffer.retain();
}
@Override
public void succeeded()
{
_retainableByteBuffer.release();
}
@Override
public void failed(Throwable x)
{
succeeded();
}
}
private class AsyncReadCallback implements Callback
{
@Override
public void succeeded()
{
if (_channel.getRequest().getHttpInput().onContentProducible())
_channel.handle();
}
@Override
public void failed(Throwable x)
{
if (_channel.failed(x))
_channel.handle();
}
@Override
public InvocationType getInvocationType()
{
// This callback does not block when the HttpInput is in blocking mode,
// rather it wakes up the thread that is blocked waiting on the read;
// but it can if it is in async mode, hence the varying InvocationType.
return _channel.getRequest().getHttpInput().isAsync() ? InvocationType.BLOCKING : InvocationType.NON_BLOCKING;
}
}
private class SendCallback extends IteratingCallback
{
private MetaData.Response _info;
private boolean _head;
private ByteBuffer _content;
private boolean _lastContent;
private Callback _callback;
private ByteBuffer _header;
private ByteBuffer _chunk;
private boolean _shutdownOut;
private SendCallback()
{
super(true);
}
@Override
public InvocationType getInvocationType()
{
return _callback.getInvocationType();
}
private boolean reset(MetaData.Request request, MetaData.Response info, ByteBuffer content, boolean last, Callback callback)
{
if (reset())
{
_info = info;
_head = request != null && HttpMethod.HEAD.is(request.getMethod());
_content = content;
_lastContent = last;
_callback = callback;
_header = null;
_shutdownOut = false;
if (getConnector().isShutdown())
_generator.setPersistent(false);
return true;
}
if (isClosed())
callback.failed(new EofException());
else
callback.failed(new WritePendingException());
return false;
}
@Override
public Action process() throws Exception
{
if (_callback == null)
throw new IllegalStateException();
boolean useDirectByteBuffers = isUseOutputDirectByteBuffers();
while (true)
{
HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, _chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("generate: {} for {} ({},{},{})@{}",
result,
this,
BufferUtil.toSummaryString(_header),
BufferUtil.toSummaryString(_content),
_lastContent,
_generator.getState());
switch (result)
{
case NEED_INFO:
throw new EofException("request lifecycle violation");
case NEED_HEADER:
{
_header = _bufferPool.acquire(Math.min(_config.getResponseHeaderSize(), _config.getOutputBufferSize()), useDirectByteBuffers);
continue;
}
case HEADER_OVERFLOW:
{
if (_header.capacity() >= _config.getResponseHeaderSize())
throw new BadMessageException(INTERNAL_SERVER_ERROR_500, "Response header too large");
releaseHeader();
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), useDirectByteBuffers);
continue;
}
case NEED_CHUNK:
{
_chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, useDirectByteBuffers);
continue;
}
case NEED_CHUNK_TRAILER:
{
releaseChunk();
_chunk = _bufferPool.acquire(_config.getResponseHeaderSize(), useDirectByteBuffers);
continue;
}
case FLUSH:
{
// Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
if (_head || _generator.isNoContent())
{
BufferUtil.clear(_chunk);
BufferUtil.clear(_content);
}
byte gatherWrite = 0;
long bytes = 0;
if (BufferUtil.hasContent(_header))
{
gatherWrite += 4;
bytes += _header.remaining();
}
if (BufferUtil.hasContent(_chunk))
{
gatherWrite += 2;
bytes += _chunk.remaining();
}
if (BufferUtil.hasContent(_content))
{
gatherWrite += 1;
bytes += _content.remaining();
}
HttpConnection.this.bytesOut.add(bytes);
switch (gatherWrite)
{
case 7:
getEndPoint().write(this, _header, _chunk, _content);
break;
case 6:
getEndPoint().write(this, _header, _chunk);
break;
case 5:
getEndPoint().write(this, _header, _content);
break;
case 4:
getEndPoint().write(this, _header);
break;
case 3:
getEndPoint().write(this, _chunk, _content);
break;
case 2:
getEndPoint().write(this, _chunk);
break;
case 1:
getEndPoint().write(this, _content);
break;
default:
succeeded();
}
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
{
_shutdownOut = true;
continue;
}
case DONE:
{
// If this is the end of the response and the connector was shutdown after response was committed,
// we can't add the Connection:close header, but we are still allowed to close the connection
// by shutting down the output.
if (getConnector().isShutdown() && _generator.isEnd() && _generator.isPersistent())
_shutdownOut = true;
return Action.SUCCEEDED;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException("generateResponse=" + result);
}
}
}
}
private Callback release()
{
Callback complete = _callback;
_callback = null;
_info = null;
_content = null;
releaseHeader();
releaseChunk();
return complete;
}
private void releaseHeader()
{
if (_header != null)
_bufferPool.release(_header);
_header = null;
}
private void releaseChunk()
{
if (_chunk != null)
_bufferPool.release(_chunk);
_chunk = null;
}
@Override
protected void onCompleteSuccess()
{
boolean upgrading = _channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE) != null;
release().succeeded();
// If successfully upgraded it is responsibility of the next protocol to close the connection.
if (_shutdownOut && !upgrading)
getEndPoint().shutdownOutput();
}
@Override
public void onCompleteFailure(final Throwable x)
{
failedCallback(release(), x);
if (_shutdownOut)
getEndPoint().shutdownOutput();
}
@Override
public String toString()
{
return String.format("%s[i=%s,cb=%s]", super.toString(), _info, _callback);
}
}
}

27
pom.xml
View File

@ -32,7 +32,7 @@
<awaitility.version>4.2.0</awaitility.version>
<bndlib.version>6.3.1</bndlib.version>
<build-support.version>1.5</build-support.version>
<checkstyle.version>10.3</checkstyle.version>
<checkstyle.version>10.3.1</checkstyle.version>
<commons-codec.version>1.15</commons-codec.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<conscrypt.version>2.5.2</conscrypt.version>
@ -73,7 +73,7 @@
<kerb-simplekdc.version>2.0.2</kerb-simplekdc.version>
<log4j2.version>2.17.2</log4j2.version>
<logback.version>1.3.0-alpha16</logback.version>
<mariadb.version>3.0.5</mariadb.version>
<mariadb.version>3.0.6</mariadb.version>
<mariadb.docker.version>10.3.6</mariadb.docker.version>
<maven.deps.version>3.8.4</maven.deps.version>
<maven-artifact-transfer.version>0.13.1</maven-artifact-transfer.version>
@ -116,7 +116,7 @@
<maven.dependency.plugin.version>3.2.0</maven.dependency.plugin.version>
<maven.deploy.plugin.version>3.0.0-M2</maven.deploy.plugin.version>
<maven.eclipse.plugin.version>2.10</maven.eclipse.plugin.version>
<maven.enforcer.plugin.version>3.0.0</maven.enforcer.plugin.version>
<maven.enforcer.plugin.version>3.1.0</maven.enforcer.plugin.version>
<maven.exec.plugin.version>3.0.0</maven.exec.plugin.version>
<maven.gpg.plugin.version>3.0.1</maven.gpg.plugin.version>
<maven.install.plugin.version>3.0.0-M1</maven.install.plugin.version>
@ -1414,11 +1414,32 @@
<artifactId>jetty-memcached-sessions</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-alpn</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-boot</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-boot-jsp</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-boot-warurl</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-httpservice</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-client</artifactId>