diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java index 129f82f1f8e..4192c448e23 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/LeakTrackingByteBufferPool.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.LeakDetector; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.log.Log; @@ -37,7 +38,8 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By LeakTrackingByteBufferPool.this.leaked(leakInfo); } }; - + + private final static boolean NOISY = Boolean.getBoolean(LeakTrackingByteBufferPool.class.getName() + ".NOISY"); private final ByteBufferPool delegate; public LeakTrackingByteBufferPool(ByteBufferPool delegate) @@ -50,9 +52,11 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By @Override public ByteBuffer acquire(int size, boolean direct) { - ByteBuffer buffer = delegate.acquire(size, direct); - if (!leakDetector.acquired(buffer)) - LOG.warn("ByteBuffer {}@{} not tracked", buffer, System.identityHashCode(buffer)); + ByteBuffer buffer = delegate.acquire(size,direct); + boolean leakd = leakDetector.acquired(buffer); + if (NOISY || !leakd) + LOG.info(String.format("ByteBuffer acquire %s leakd.acquired=%s",BufferUtil.toIDString(buffer),leakd ? "normal" : "LEAK"), + new Throwable("LeakStack.Acquire")); return buffer; } @@ -61,13 +65,15 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By { if (buffer == null) return; - if (!leakDetector.released(buffer)) - LOG.warn("ByteBuffer {}@{} released but not acquired", buffer, System.identityHashCode(buffer)); + boolean leakd = leakDetector.released(buffer); + if (NOISY || !leakd) + LOG.info(String.format("ByteBuffer release %s leakd.released=%s",BufferUtil.toIDString(buffer),leakd ? "normal" : "LEAK"), + new Throwable("LeakStack.Release")); delegate.release(buffer); } protected void leaked(LeakDetector.LeakInfo leakInfo) { - LOG.warn("ByteBuffer " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames()); + LOG.warn("ByteBuffer " + leakInfo.getResourceDescription() + " leaked at:",leakInfo.getStackFrames()); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java index 7306e2fe340..84073409ced 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java @@ -23,6 +23,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.BufferUtil; @@ -56,13 +57,24 @@ public class MappedByteBufferPool implements ByteBufferPool if (result == null) { int capacity = bucket * factor; - result = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity); + result = direct ? createDirect(capacity) : createInDirect(capacity); } BufferUtil.clear(result); return result; } + protected ByteBuffer createDirect(int capacity) + { + return BufferUtil.allocateDirect(capacity); + } + + protected ByteBuffer createInDirect(int capacity) + { + return BufferUtil.allocate(capacity); + } + + @Override public void release(ByteBuffer buffer) { @@ -108,4 +120,25 @@ public class MappedByteBufferPool implements ByteBufferPool { return direct ? directBuffers : heapBuffers; } + + static AtomicInteger __tag = new AtomicInteger(); + public static class Tagged extends MappedByteBufferPool + { + protected ByteBuffer createInDirect(int capacity) + { + ByteBuffer buffer = BufferUtil.allocate(capacity+4); + buffer.limit(4); + buffer.putInt(0,__tag.incrementAndGet()); + buffer.position(4); + buffer.limit(buffer.capacity()); + ByteBuffer slice = buffer.slice(); + BufferUtil.clear(slice); + return slice; + } + + protected ByteBuffer createDirect(int capacity) + { + return createInDirect(capacity); + } + } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java index cf64c2394b0..3f491573ff0 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.io; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; @@ -29,7 +30,9 @@ import java.nio.ByteBuffer; import java.util.Queue; import java.util.concurrent.ConcurrentMap; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; +import org.hamcrest.Matchers; import org.junit.Test; public class MappedByteBufferPoolTest @@ -113,4 +116,16 @@ public class MappedByteBufferPoolTest // Expected path. } } + + @Test + public void testTagged() + { + MappedByteBufferPool pool = new MappedByteBufferPool.Tagged(); + + ByteBuffer buffer = pool.acquire(1024,false); + + assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000001")); + buffer = pool.acquire(1024,false); + assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000002")); + } } diff --git a/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/Async502Loop.java b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/Async502Loop.java new file mode 100644 index 00000000000..22cf86e09d5 --- /dev/null +++ b/jetty-proxy/src/test/java/org/eclipse/jetty/proxy/Async502Loop.java @@ -0,0 +1,241 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.proxy; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpProxy; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.proxy.AbstractProxyServlet; +import org.eclipse.jetty.proxy.AsyncProxyServlet; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.QueuedThreadPool; + +public class Async502Loop +{ + private static final Logger LOG = Log.getLogger(Async502Loop.class); + private static final String PROXIED_HEADER = "X-Proxied"; + + public static void main(String[] args) + { + try + { + new Async502Loop().loop(); + } + catch (Throwable t) + { + t.printStackTrace(System.err); + } + } + + private HttpClient client; + private Server proxy; + private ServerConnector proxyConnector; + private ServletContextHandler proxyContext; + private AbstractProxyServlet proxyServlet; + private Server server; + private ServerConnector serverConnector; + + public Async502Loop() + { + proxyServlet = new AsyncProxyServlet(); + // proxyServlet = new AsyncMiddleManServlet(); + } + + private void startServer(HttpServlet servlet) throws Exception + { + QueuedThreadPool serverPool = new QueuedThreadPool(); + serverPool.setName("server"); + server = new Server(serverPool); + serverConnector = new ServerConnector(server); + server.addConnector(serverConnector); + + ServletContextHandler appCtx = new ServletContextHandler(server,"/",true,false); + ServletHolder appServletHolder = new ServletHolder(servlet); + appCtx.addServlet(appServletHolder,"/*"); + + server.start(); + } + + private void startProxy() throws Exception + { + startProxy(new HashMap()); + } + + private void startProxy(Map initParams) throws Exception + { + QueuedThreadPool proxyPool = new QueuedThreadPool(); + proxyPool.setName("proxy"); + proxy = new Server(proxyPool); + + HttpConfiguration configuration = new HttpConfiguration(); + configuration.setSendDateHeader(false); + configuration.setSendServerVersion(false); + String value = initParams.get("outputBufferSize"); + if (value != null) + configuration.setOutputBufferSize(Integer.valueOf(value)); + proxyConnector = new ServerConnector(proxy,new HttpConnectionFactory(configuration)); + proxy.addConnector(proxyConnector); + + proxyContext = new ServletContextHandler(proxy,"/",true,false); + ServletHolder proxyServletHolder = new ServletHolder(proxyServlet); + proxyServletHolder.setInitParameters(initParams); + proxyContext.addServlet(proxyServletHolder,"/*"); + + proxy.start(); + } + + private void startClient() throws Exception + { + client = prepareClient(); + } + + private HttpClient prepareClient() throws Exception + { + QueuedThreadPool clientPool = new QueuedThreadPool(); + clientPool.setName("client"); + HttpClient result = new HttpClient(); + result.setExecutor(clientPool); + result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost",proxyConnector.getLocalPort())); + result.start(); + return result; + } + + public void dispose() throws Exception + { + client.stop(); + proxy.stop(); + server.stop(); + } + + private static class ClientLoop implements Runnable + { + private final CountDownLatch active; + private final AtomicBoolean ok; + private final HttpClient client; + private final String host; + private final int port; + + public ClientLoop(CountDownLatch activeClientLatch, AtomicBoolean atomicOk, HttpClient client, String serverHost, int serverPort) + { + this.active = activeClientLatch; + this.ok = atomicOk; + this.client = client; + this.host = serverHost; + this.port = serverPort; + } + + @Override + public void run() + { + String threadName = Thread.currentThread().getName(); + LOG.info("Starting thread {}",threadName); + try + { + while (ok.get()) + { + byte[] content = new byte[1024]; + new Random().nextBytes(content); + ContentResponse response = client.newRequest(host,port).method(HttpMethod.POST).content(new BytesContentProvider(content)) + .timeout(5,TimeUnit.SECONDS).send(); + + if (response.getStatus() != 200) + { + LOG.warn("Got response <{}>, expecting <{}>",response.getStatus(),200); + // allow all ClientLoops to finish + ok.set(false); + } + } + } + catch (InterruptedException | TimeoutException | ExecutionException e) + { + LOG.warn("Error processing request",e); + ok.set(false); + } + finally + { + LOG.info("Shutting down thread {}",threadName); + active.countDown(); + } + } + } + + @SuppressWarnings("serial") + private void loop() throws Exception + { + startServer(new HttpServlet() + { + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + { + if (req.getHeader("Via") != null) + resp.addHeader(PROXIED_HEADER,"true"); + IO.copy(req.getInputStream(),resp.getOutputStream()); + } + }); + startProxy(); + startClient(); + + // Number of clients to simulate + int clientCount = 5; + // Latch for number of clients still active (used to terminate test) + final CountDownLatch activeClientLatch = new CountDownLatch(clientCount); + // Atomic Boolean to track that its OK to still continue looping. + // When this goes false, that means one of the client threads has + // encountered an error condition, and should allow all remaining + // client threads to finish cleanly. + final AtomicBoolean atomicOk = new AtomicBoolean(true); + + // Start clients + for (int i = 0; i < clientCount; i++) + { + ClientLoop r = new ClientLoop(activeClientLatch,atomicOk,client,"localhost",serverConnector.getLocalPort()); + String name = "client-" + i; + Thread thread = new Thread(r,name); + thread.start(); + } + + activeClientLatch.await(); + dispose(); + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ShutdownMonitor.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ShutdownMonitor.java index bb9cff00a37..d2da9a9a637 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ShutdownMonitor.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ShutdownMonitor.java @@ -23,6 +23,7 @@ import java.io.InputStreamReader; import java.io.LineNumberReader; import java.io.OutputStream; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.StandardCharsets; @@ -294,7 +295,9 @@ public class ShutdownMonitor try { - serverSocket = new ServerSocket(port,1,InetAddress.getByName("127.0.0.1")); + serverSocket = new ServerSocket(); + serverSocket.setReuseAddress(true); + serverSocket.bind(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), port), 1); if (port == 0) { // server assigned port in use diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index 3d7957a3108..842e2011c70 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -939,6 +939,43 @@ public class BufferUtil return builder.toString(); } + + + /* ------------------------------------------------------------ */ + /** Convert Buffer to string ID independent of content + * @param buffer + * @return A string showing the buffer ID + */ + public static void idString(ByteBuffer buffer, StringBuilder out) + { + out.append(buffer.getClass().getSimpleName()); + out.append("@"); + if (buffer.hasArray() && buffer.arrayOffset()==4) + { + out.append('T'); + byte[] array = buffer.array(); + TypeUtil.toHex(array[0],out); + TypeUtil.toHex(array[1],out); + TypeUtil.toHex(array[2],out); + TypeUtil.toHex(array[3],out); + } + else + out.append(Integer.toHexString(System.identityHashCode(buffer))); + } + + /* ------------------------------------------------------------ */ + /** Convert Buffer to string ID independent of content + * @param buffer + * @return A string showing the buffer ID + */ + public static String toIDString(ByteBuffer buffer) + { + StringBuilder buf = new StringBuilder(); + idString(buffer,buf); + return buf.toString(); + } + + /* ------------------------------------------------------------ */ /** Convert Buffer to a detail debug string of pointers and content * @param buffer @@ -950,9 +987,7 @@ public class BufferUtil return "null"; StringBuilder buf = new StringBuilder(); - buf.append(buffer.getClass().getSimpleName()); - buf.append("@"); - buf.append(Integer.toHexString(System.identityHashCode(buffer))); + idString(buffer,buf); buf.append("[p="); buf.append(buffer.position()); buf.append(",l="); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/LeakDetector.java b/jetty-util/src/main/java/org/eclipse/jetty/util/LeakDetector.java index 924fb1603fb..77851d9dd9b 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/LeakDetector.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/LeakDetector.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.util; import java.lang.ref.PhantomReference; import java.lang.ref.ReferenceQueue; -import java.lang.ref.WeakReference; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,42 +30,32 @@ import org.eclipse.jetty.util.log.Logger; /** * A facility to detect improper usage of resource pools. *

- * Resource pools usually have a method to acquire a pooled resource - * and a method to released it back to the pool. + * Resource pools usually have a method to acquire a pooled resource and a method to released it back to the pool. *

- * To detect if client code acquires a resource but never releases it, - * the resource pool can be modified to use a {@link LeakDetector}. - * The modified resource pool should call {@link #acquired(Object)} every time - * the method to acquire a resource is called, and {@link #released(Object)} - * every time the method to release the resource is called. + * To detect if client code acquires a resource but never releases it, the resource pool can be modified to use a + * {@link LeakDetector}. The modified resource pool should call {@link #acquired(Object)} every time the method to + * acquire a resource is called, and {@link #released(Object)} every time the method to release the resource is called. * {@link LeakDetector} keeps track of these resources and invokes method - * {@link #leaked(org.eclipse.jetty.util.LeakDetector.LeakInfo)} when it detects that a resource - * has been leaked (that is, acquired but never released). + * {@link #leaked(org.eclipse.jetty.util.LeakDetector.LeakInfo)} when it detects that a resource has been leaked (that + * is, acquired but never released). *

- * To detect whether client code releases a resource without having - * acquired it, the resource pool can be modified to check the return value - * of {@link #released(Object)}: if false, it means that the resource was - * not acquired. + * To detect whether client code releases a resource without having acquired it, the resource pool can be modified to + * check the return value of {@link #released(Object)}: if false, it means that the resource was not acquired. *

* IMPLEMENTATION NOTES *

- * This class relies on {@link System#identityHashCode(Object)} to create - * a unique id for each resource passed to {@link #acquired(Object)} and - * {@link #released(Object)}. {@link System#identityHashCode(Object)} does - * not guarantee that it will not generate the same number for different - * objects, but in practice the chance of collision is rare. + * This class relies on {@link System#identityHashCode(Object)} to create a unique id for each resource passed to + * {@link #acquired(Object)} and {@link #released(Object)}. {@link System#identityHashCode(Object)} does not guarantee + * that it will not generate the same number for different objects, but in practice the chance of collision is rare. *

- * {@link LeakDetector} uses {@link PhantomReference}s to detect leaks. - * {@link PhantomReference}s are enqueued in their {@link ReferenceQueue} - * after they have been garbage collected (differently from - * {@link WeakReference}s that are enqueued before). - * Since the resource is now garbage collected, {@link LeakDetector} checks - * whether it has been released and if not, it reports a leak. - * Using {@link PhantomReference}s is better than overriding {@link #finalize()} - * and works also in those cases where {@link #finalize()} is not - * overridable. + * {@link LeakDetector} uses {@link PhantomReference}s to detect leaks. {@link PhantomReference}s are enqueued in their + * {@link ReferenceQueue} after they have been garbage collected (differently from {@link WeakReference}s that + * are enqueued before). Since the resource is now garbage collected, {@link LeakDetector} checks whether it + * has been released and if not, it reports a leak. Using {@link PhantomReference}s is better than overriding + * {@link #finalize()} and works also in those cases where {@link #finalize()} is not overridable. * - * @param the resource type. + * @param + * the resource type. */ public class LeakDetector extends AbstractLifeCycle implements Runnable { @@ -79,33 +68,54 @@ public class LeakDetector extends AbstractLifeCycle implements Runnable /** * Tracks the resource as been acquired. * - * @param resource the resource that has been acquired - * @return whether the resource has been tracked + * @param resource + * the resource that has been acquired + * @return true whether the resource has been acquired normally, false if the resource has detected a leak (meaning + * that another acquire occurred before a release of the same resource) * @see #released(Object) */ public boolean acquired(T resource) { String id = id(resource); - return resources.putIfAbsent(id, new LeakInfo(resource, id)) == null; + LeakInfo info = resources.putIfAbsent(id,new LeakInfo(resource,id)); + if (info != null) + { + // leak detected, prior acquire exists (not released) + LOG.warn("Prior Acquire from Stack",info.getStackFrames()); + return false; + } + // normal behavior + return true; } /** * Tracks the resource as been released. * - * @param resource the resource that has been released - * @return whether the resource has been acquired + * @param resource + * the resource that has been released + * @return true whether the resource has been released normally (based on a previous acquire). false if the resource + * has been released without a prior acquire (such as a double release scenario) * @see #acquired(Object) */ public boolean released(T resource) { String id = id(resource); - return resources.remove(id) != null; + LeakInfo info = resources.remove(id); + if (info != null) + { + // normal path + return true; + } + + // leak detected (released without acquire) + return false; } /** * Generates a unique ID for the given resource. * - * @param resource the resource to generate the unique ID for + * @param resource + * the resource to generate the unique ID for * @return the unique ID of the given resource */ protected String id(T resource) @@ -117,7 +127,7 @@ public class LeakDetector extends AbstractLifeCycle implements Runnable protected void doStart() throws Exception { super.doStart(); - thread = new Thread(this, getClass().getSimpleName()); + thread = new Thread(this,getClass().getSimpleName()); thread.setDaemon(true); thread.start(); } @@ -139,7 +149,7 @@ public class LeakDetector extends AbstractLifeCycle implements Runnable @SuppressWarnings("unchecked") LeakInfo leakInfo = (LeakInfo)queue.remove(); if (LOG.isDebugEnabled()) - LOG.debug("Resource GC'ed: {}", leakInfo); + LOG.debug("Resource GC'ed: {}",leakInfo); if (resources.remove(leakInfo.id) != null) leaked(leakInfo); } @@ -153,11 +163,12 @@ public class LeakDetector extends AbstractLifeCycle implements Runnable /** * Callback method invoked by {@link LeakDetector} when it detects that a resource has been leaked. * - * @param leakInfo the information about the leak + * @param leakInfo + * the information about the leak */ protected void leaked(LeakInfo leakInfo) { - LOG.warn("Resource leaked: " + leakInfo.description, leakInfo.stackFrames); + LOG.warn("Resource leaked: " + leakInfo.description,leakInfo.stackFrames); } /** @@ -171,7 +182,7 @@ public class LeakDetector extends AbstractLifeCycle implements Runnable private LeakInfo(T referent, String id) { - super(referent, queue); + super(referent,queue); this.id = id; this.description = referent.toString(); this.stackFrames = new Throwable();