From 17c36497714ed7d9ba5bb93ee82662c0f97fcbd3 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 16 Aug 2023 15:26:57 +0200 Subject: [PATCH] Fixes #10217 - Review ProxyConnectionFactory buffer management. (#10225) Fixed buffer leak in ProxyConnection classes. Introduced ArrayByteBufferPool.Tracking to test buffer leaks. Signed-off-by: Simone Bordet --- .../client/HttpClientProxyProtocolTest.java | 20 ++- .../test/resources/jetty-logging.properties | 1 + .../eclipse/jetty/io/ArrayByteBufferPool.java | 114 ++++++++++++++++++ .../jetty/server/ProxyConnectionFactory.java | 59 +++------ .../jetty/server/MultiPartByteRangesTest.java | 35 +----- 5 files changed, 153 insertions(+), 76 deletions(-) diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientProxyProtocolTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientProxyProtocolTest.java index 89928e257df..30a995f3a3b 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientProxyProtocolTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientProxyProtocolTest.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.client; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import org.eclipse.jetty.client.transport.HttpDestination; @@ -23,6 +24,7 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MimeTypes; +import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Handler; @@ -33,6 +35,7 @@ import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -46,15 +49,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class HttpClientProxyProtocolTest { + private ArrayByteBufferPool.Tracking serverBufferPool; private Server server; private ServerConnector connector; + private ArrayByteBufferPool.Tracking clientBufferPool; private HttpClient client; private void startServer(Handler handler) throws Exception { QueuedThreadPool serverThreads = new QueuedThreadPool(); serverThreads.setName("server"); - server = new Server(serverThreads); + serverBufferPool = new ArrayByteBufferPool.Tracking(); + server = new Server(serverThreads, null, serverBufferPool); HttpConnectionFactory http = new HttpConnectionFactory(); ProxyConnectionFactory proxy = new ProxyConnectionFactory(http.getProtocol()); connector = new ServerConnector(server, 1, 1, proxy, http); @@ -67,18 +73,22 @@ public class HttpClientProxyProtocolTest { QueuedThreadPool clientThreads = new QueuedThreadPool(); clientThreads.setName("client"); + clientBufferPool = new ArrayByteBufferPool.Tracking(); client = new HttpClient(); client.setExecutor(clientThreads); + client.setByteBufferPool(clientBufferPool); client.start(); } @AfterEach public void dispose() throws Exception { - if (server != null) - server.stop(); - if (client != null) - client.stop(); + LifeCycle.stop(client); + LifeCycle.stop(server); + Set serverLeaks = serverBufferPool.getLeaks(); + assertEquals(0, serverLeaks.size(), serverBufferPool.dumpLeaks()); + Set clientLeaks = clientBufferPool.getLeaks(); + assertEquals(0, clientLeaks.size(), clientBufferPool.dumpLeaks()); } @Test diff --git a/jetty-core/jetty-client/src/test/resources/jetty-logging.properties b/jetty-core/jetty-client/src/test/resources/jetty-logging.properties index e194e90d87c..90423896e45 100644 --- a/jetty-core/jetty-client/src/test/resources/jetty-logging.properties +++ b/jetty-core/jetty-client/src/test/resources/jetty-logging.properties @@ -1,5 +1,6 @@ #org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.client.LEVEL=DEBUG +#org.eclipse.jetty.io.ArrayByteBufferPool$Tracking.LEVEL=DEBUG #org.eclipse.jetty.io.SocketChannelEndPoint.LEVEL=DEBUG #org.eclipse.jetty.io.ssl.LEVEL=DEBUG #org.eclipse.jetty.http.LEVEL=DEBUG diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index ff914e0590d..2d906197bf9 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -14,11 +14,17 @@ package org.eclipse.jetty.io; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.nio.ByteBuffer; +import java.time.Instant; import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.IntUnaryOperator; +import java.util.stream.Collectors; import org.eclipse.jetty.io.internal.CompoundPool; import org.eclipse.jetty.io.internal.QueuedPool; @@ -564,4 +570,112 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable ); } } + + /** + *

A variant of {@link ArrayByteBufferPool} that tracks buffer + * acquires/releases, useful to identify buffer leaks.

+ *

Use {@link #getLeaks()} when the system is idle to get + * the {@link Buffer}s that have been leaked, which contain + * the stack trace information of where the buffer was acquired.

+ */ + public static class Tracking extends ArrayByteBufferPool + { + private static final Logger LOG = LoggerFactory.getLogger(Tracking.class); + + private final Set buffers = ConcurrentHashMap.newKeySet(); + + public Tracking() + { + this(0, -1, Integer.MAX_VALUE); + } + + public Tracking(int minCapacity, int maxCapacity, int maxBucketSize) + { + this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L); + } + + public Tracking(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) + { + super(minCapacity, -1, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory); + } + + @Override + public RetainableByteBuffer acquire(int size, boolean direct) + { + RetainableByteBuffer buffer = super.acquire(size, direct); + Buffer wrapper = new Buffer(buffer, size); + if (LOG.isDebugEnabled()) + LOG.debug("acquired {}", wrapper); + buffers.add(wrapper); + return wrapper; + } + + public Set getLeaks() + { + return buffers; + } + + public String dumpLeaks() + { + return getLeaks().stream() + .map(Buffer::dump) + .collect(Collectors.joining(System.lineSeparator())); + } + + public class Buffer extends RetainableByteBuffer.Wrapper + { + private final int size; + private final Instant acquireInstant; + private final Throwable acquireStack; + + private Buffer(RetainableByteBuffer wrapped, int size) + { + super(wrapped); + this.size = size; + this.acquireInstant = Instant.now(); + this.acquireStack = new Throwable(); + } + + public int getSize() + { + return size; + } + + public Instant getAcquireInstant() + { + return acquireInstant; + } + + public Throwable getAcquireStack() + { + return acquireStack; + } + + @Override + public boolean release() + { + boolean released = super.release(); + if (released) + { + buffers.remove(this); + if (LOG.isDebugEnabled()) + LOG.debug("released {}", this); + } + return released; + } + + public String dump() + { + StringWriter w = new StringWriter(); + getAcquireStack().printStackTrace(new PrintWriter(w)); + return "%s of %d bytes on %s at %s".formatted(getClass().getSimpleName(), getSize(), getAcquireInstant(), w); + } + + @Override + public String toString() + { + return "%s@%x[%s]".formatted(getClass().getSimpleName(), hashCode(), super.toString()); + } + } + } } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java index d2cc7560728..548db964aa3 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; *

This factory can be placed in front of any other connection factory * to process the proxy v1 or v2 line before the normal protocol handling

* - * @see http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt + * @see PROXY protocol */ public class ProxyConnectionFactory extends DetectorConnectionFactory { @@ -245,6 +245,7 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory _buffer.release(); return unconsumed; } + _buffer.release(); return null; } @@ -564,6 +565,7 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory _buffer.release(); return unconsumed; } + _buffer.release(); return null; } @@ -591,7 +593,7 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory SocketAddress remote; switch (_family) { - case INET: + case INET -> { byte[] addr = new byte[4]; byteBuffer.get(addr); @@ -602,9 +604,8 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory int dstPort = byteBuffer.getChar(); local = new InetSocketAddress(dstAddr, dstPort); remote = new InetSocketAddress(srcAddr, srcPort); - break; } - case INET6: + case INET6 -> { byte[] addr = new byte[16]; byteBuffer.get(addr); @@ -615,9 +616,8 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory int dstPort = byteBuffer.getChar(); local = new InetSocketAddress(dstAddr, dstPort); remote = new InetSocketAddress(srcAddr, srcPort); - break; } - case UNIX: + case UNIX -> { byte[] addr = new byte[108]; byteBuffer.get(addr); @@ -626,12 +626,8 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory String dst = UnixDomain.toPath(addr); local = UnixDomain.newSocketAddress(dst); remote = UnixDomain.newSocketAddress(src); - break; - } - default: - { - throw new IllegalStateException("Unsupported family " + _family); } + default -> throw new IllegalStateException("Unsupported family " + _family); } proxyEndPoint = new ProxyEndPoint(endPoint, local, remote); @@ -714,37 +710,20 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory int transportAndFamily = 0xFF & byteBuffer.get(); switch (transportAndFamily >> 4) { - case 0: - _family = Family.UNSPEC; - break; - case 1: - _family = Family.INET; - break; - case 2: - _family = Family.INET6; - break; - case 3: - _family = Family.UNIX; - break; - default: - throw new IOException("Proxy v2 bad PROXY family"); + case 0 -> _family = Family.UNSPEC; + case 1 -> _family = Family.INET; + case 2 -> _family = Family.INET6; + case 3 -> _family = Family.UNIX; + default -> throw new IOException("Proxy v2 bad PROXY family"); } - Transport transport; - switch (transportAndFamily & 0xF) + Transport transport = switch (transportAndFamily & 0xF) { - case 0: - transport = Transport.UNSPEC; - break; - case 1: - transport = Transport.STREAM; - break; - case 2: - transport = Transport.DGRAM; - break; - default: - throw new IOException("Proxy v2 bad PROXY family"); - } + case 0 -> Transport.UNSPEC; + case 1 -> Transport.STREAM; + case 2 -> Transport.DGRAM; + default -> throw new IOException("Proxy v2 bad PROXY family"); + }; _length = byteBuffer.getChar(); @@ -761,6 +740,8 @@ public class ProxyConnectionFactory extends DetectorConnectionFactory private void releaseAndClose() { + if (LOG.isDebugEnabled()) + LOG.debug("Proxy v2 releasing buffer and closing"); _buffer.release(); close(); } diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartByteRangesTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartByteRangesTest.java index 745de4c3ada..8c138113014 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartByteRangesTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/MultiPartByteRangesTest.java @@ -19,7 +19,6 @@ import java.nio.channels.SocketChannel; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.http.ByteRange; import org.eclipse.jetty.http.HttpField; @@ -31,7 +30,6 @@ import org.eclipse.jetty.http.MultiPart; import org.eclipse.jetty.http.MultiPartByteRanges; import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.Content; -import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.io.content.ByteBufferContentSource; import org.eclipse.jetty.toolchain.test.FS; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; @@ -49,13 +47,13 @@ public class MultiPartByteRangesTest { private Server server; private ServerConnector connector; - private LeakTrackingBufferPool byteBufferPool; + private ArrayByteBufferPool.Tracking byteBufferPool; private void start(Handler handler) throws Exception { QueuedThreadPool serverThreads = new QueuedThreadPool(); serverThreads.setName("server"); - byteBufferPool = new LeakTrackingBufferPool(); + byteBufferPool = new ArrayByteBufferPool.Tracking(); server = new Server(serverThreads, null, byteBufferPool); connector = new ServerConnector(server, 1, 1); server.addConnector(connector); @@ -67,7 +65,7 @@ public class MultiPartByteRangesTest public void dispose() { LifeCycle.stop(server); - assertEquals(0, byteBufferPool.countLeaks()); + assertEquals(0, byteBufferPool.getLeaks().size()); } @Test @@ -131,31 +129,4 @@ public class MultiPartByteRangesTest assertEquals("CDEF", Content.Source.asString(part3.getContentSource())); } } - - private static class LeakTrackingBufferPool extends ArrayByteBufferPool - { - private final AtomicInteger leaks = new AtomicInteger(); - - public int countLeaks() - { - return leaks.get(); - } - - @Override - public RetainableByteBuffer acquire(int size, boolean direct) - { - leaks.incrementAndGet(); - return new RetainableByteBuffer.Wrapper(super.acquire(size, direct)) - { - @Override - public boolean release() - { - boolean released = super.release(); - if (released) - leaks.decrementAndGet(); - return released; - } - }; - } - } }