Merge remote-tracking branch 'origin/jetty-9.2.x'

Conflicts:
	jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java
This commit is contained in:
Greg Wilkins 2015-03-06 12:45:55 +11:00
commit 9b5205ba40
7 changed files with 397 additions and 53 deletions

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
@ -37,7 +38,8 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
LeakTrackingByteBufferPool.this.leaked(leakInfo);
}
};
private final static boolean NOISY = Boolean.getBoolean(LeakTrackingByteBufferPool.class.getName() + ".NOISY");
private final ByteBufferPool delegate;
public LeakTrackingByteBufferPool(ByteBufferPool delegate)
@ -50,9 +52,11 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
@Override
public ByteBuffer acquire(int size, boolean direct)
{
ByteBuffer buffer = delegate.acquire(size, direct);
if (!leakDetector.acquired(buffer))
LOG.warn("ByteBuffer {}@{} not tracked", buffer, System.identityHashCode(buffer));
ByteBuffer buffer = delegate.acquire(size,direct);
boolean leakd = leakDetector.acquired(buffer);
if (NOISY || !leakd)
LOG.info(String.format("ByteBuffer acquire %s leakd.acquired=%s",BufferUtil.toIDString(buffer),leakd ? "normal" : "LEAK"),
new Throwable("LeakStack.Acquire"));
return buffer;
}
@ -61,13 +65,15 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
{
if (buffer == null)
return;
if (!leakDetector.released(buffer))
LOG.warn("ByteBuffer {}@{} released but not acquired", buffer, System.identityHashCode(buffer));
boolean leakd = leakDetector.released(buffer);
if (NOISY || !leakd)
LOG.info(String.format("ByteBuffer release %s leakd.released=%s",BufferUtil.toIDString(buffer),leakd ? "normal" : "LEAK"),
new Throwable("LeakStack.Release"));
delegate.release(buffer);
}
protected void leaked(LeakDetector<ByteBuffer>.LeakInfo leakInfo)
{
LOG.warn("ByteBuffer " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames());
LOG.warn("ByteBuffer " + leakInfo.getResourceDescription() + " leaked at:",leakInfo.getStackFrames());
}
}

View File

@ -23,6 +23,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
@ -56,13 +57,24 @@ public class MappedByteBufferPool implements ByteBufferPool
if (result == null)
{
int capacity = bucket * factor;
result = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
result = direct ? createDirect(capacity) : createInDirect(capacity);
}
BufferUtil.clear(result);
return result;
}
protected ByteBuffer createDirect(int capacity)
{
return BufferUtil.allocateDirect(capacity);
}
protected ByteBuffer createInDirect(int capacity)
{
return BufferUtil.allocate(capacity);
}
@Override
public void release(ByteBuffer buffer)
{
@ -108,4 +120,25 @@ public class MappedByteBufferPool implements ByteBufferPool
{
return direct ? directBuffers : heapBuffers;
}
static AtomicInteger __tag = new AtomicInteger();
public static class Tagged extends MappedByteBufferPool
{
protected ByteBuffer createInDirect(int capacity)
{
ByteBuffer buffer = BufferUtil.allocate(capacity+4);
buffer.limit(4);
buffer.putInt(0,__tag.incrementAndGet());
buffer.position(4);
buffer.limit(buffer.capacity());
ByteBuffer slice = buffer.slice();
BufferUtil.clear(slice);
return slice;
}
protected ByteBuffer createDirect(int capacity)
{
return createInDirect(capacity);
}
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
@ -29,7 +30,9 @@ import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.hamcrest.Matchers;
import org.junit.Test;
public class MappedByteBufferPoolTest
@ -113,4 +116,16 @@ public class MappedByteBufferPoolTest
// Expected path.
}
}
@Test
public void testTagged()
{
MappedByteBufferPool pool = new MappedByteBufferPool.Tagged();
ByteBuffer buffer = pool.acquire(1024,false);
assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000001"));
buffer = pool.acquire(1024,false);
assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000002"));
}
}

View File

@ -0,0 +1,241 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.proxy;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.proxy.AbstractProxyServlet;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
public class Async502Loop
{
private static final Logger LOG = Log.getLogger(Async502Loop.class);
private static final String PROXIED_HEADER = "X-Proxied";
public static void main(String[] args)
{
try
{
new Async502Loop().loop();
}
catch (Throwable t)
{
t.printStackTrace(System.err);
}
}
private HttpClient client;
private Server proxy;
private ServerConnector proxyConnector;
private ServletContextHandler proxyContext;
private AbstractProxyServlet proxyServlet;
private Server server;
private ServerConnector serverConnector;
public Async502Loop()
{
proxyServlet = new AsyncProxyServlet();
// proxyServlet = new AsyncMiddleManServlet();
}
private void startServer(HttpServlet servlet) throws Exception
{
QueuedThreadPool serverPool = new QueuedThreadPool();
serverPool.setName("server");
server = new Server(serverPool);
serverConnector = new ServerConnector(server);
server.addConnector(serverConnector);
ServletContextHandler appCtx = new ServletContextHandler(server,"/",true,false);
ServletHolder appServletHolder = new ServletHolder(servlet);
appCtx.addServlet(appServletHolder,"/*");
server.start();
}
private void startProxy() throws Exception
{
startProxy(new HashMap<String, String>());
}
private void startProxy(Map<String, String> initParams) throws Exception
{
QueuedThreadPool proxyPool = new QueuedThreadPool();
proxyPool.setName("proxy");
proxy = new Server(proxyPool);
HttpConfiguration configuration = new HttpConfiguration();
configuration.setSendDateHeader(false);
configuration.setSendServerVersion(false);
String value = initParams.get("outputBufferSize");
if (value != null)
configuration.setOutputBufferSize(Integer.valueOf(value));
proxyConnector = new ServerConnector(proxy,new HttpConnectionFactory(configuration));
proxy.addConnector(proxyConnector);
proxyContext = new ServletContextHandler(proxy,"/",true,false);
ServletHolder proxyServletHolder = new ServletHolder(proxyServlet);
proxyServletHolder.setInitParameters(initParams);
proxyContext.addServlet(proxyServletHolder,"/*");
proxy.start();
}
private void startClient() throws Exception
{
client = prepareClient();
}
private HttpClient prepareClient() throws Exception
{
QueuedThreadPool clientPool = new QueuedThreadPool();
clientPool.setName("client");
HttpClient result = new HttpClient();
result.setExecutor(clientPool);
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost",proxyConnector.getLocalPort()));
result.start();
return result;
}
public void dispose() throws Exception
{
client.stop();
proxy.stop();
server.stop();
}
private static class ClientLoop implements Runnable
{
private final CountDownLatch active;
private final AtomicBoolean ok;
private final HttpClient client;
private final String host;
private final int port;
public ClientLoop(CountDownLatch activeClientLatch, AtomicBoolean atomicOk, HttpClient client, String serverHost, int serverPort)
{
this.active = activeClientLatch;
this.ok = atomicOk;
this.client = client;
this.host = serverHost;
this.port = serverPort;
}
@Override
public void run()
{
String threadName = Thread.currentThread().getName();
LOG.info("Starting thread {}",threadName);
try
{
while (ok.get())
{
byte[] content = new byte[1024];
new Random().nextBytes(content);
ContentResponse response = client.newRequest(host,port).method(HttpMethod.POST).content(new BytesContentProvider(content))
.timeout(5,TimeUnit.SECONDS).send();
if (response.getStatus() != 200)
{
LOG.warn("Got response <{}>, expecting <{}>",response.getStatus(),200);
// allow all ClientLoops to finish
ok.set(false);
}
}
}
catch (InterruptedException | TimeoutException | ExecutionException e)
{
LOG.warn("Error processing request",e);
ok.set(false);
}
finally
{
LOG.info("Shutting down thread {}",threadName);
active.countDown();
}
}
}
@SuppressWarnings("serial")
private void loop() throws Exception
{
startServer(new HttpServlet()
{
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
if (req.getHeader("Via") != null)
resp.addHeader(PROXIED_HEADER,"true");
IO.copy(req.getInputStream(),resp.getOutputStream());
}
});
startProxy();
startClient();
// Number of clients to simulate
int clientCount = 5;
// Latch for number of clients still active (used to terminate test)
final CountDownLatch activeClientLatch = new CountDownLatch(clientCount);
// Atomic Boolean to track that its OK to still continue looping.
// When this goes false, that means one of the client threads has
// encountered an error condition, and should allow all remaining
// client threads to finish cleanly.
final AtomicBoolean atomicOk = new AtomicBoolean(true);
// Start clients
for (int i = 0; i < clientCount; i++)
{
ClientLoop r = new ClientLoop(activeClientLatch,atomicOk,client,"localhost",serverConnector.getLocalPort());
String name = "client-" + i;
Thread thread = new Thread(r,name);
thread.start();
}
activeClientLatch.await();
dispose();
}
}

View File

@ -23,6 +23,7 @@ import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
@ -294,7 +295,9 @@ public class ShutdownMonitor
try
{
serverSocket = new ServerSocket(port,1,InetAddress.getByName("127.0.0.1"));
serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), port), 1);
if (port == 0)
{
// server assigned port in use

View File

@ -939,6 +939,43 @@ public class BufferUtil
return builder.toString();
}
/* ------------------------------------------------------------ */
/** Convert Buffer to string ID independent of content
* @param buffer
* @return A string showing the buffer ID
*/
public static void idString(ByteBuffer buffer, StringBuilder out)
{
out.append(buffer.getClass().getSimpleName());
out.append("@");
if (buffer.hasArray() && buffer.arrayOffset()==4)
{
out.append('T');
byte[] array = buffer.array();
TypeUtil.toHex(array[0],out);
TypeUtil.toHex(array[1],out);
TypeUtil.toHex(array[2],out);
TypeUtil.toHex(array[3],out);
}
else
out.append(Integer.toHexString(System.identityHashCode(buffer)));
}
/* ------------------------------------------------------------ */
/** Convert Buffer to string ID independent of content
* @param buffer
* @return A string showing the buffer ID
*/
public static String toIDString(ByteBuffer buffer)
{
StringBuilder buf = new StringBuilder();
idString(buffer,buf);
return buf.toString();
}
/* ------------------------------------------------------------ */
/** Convert Buffer to a detail debug string of pointers and content
* @param buffer
@ -950,9 +987,7 @@ public class BufferUtil
return "null";
StringBuilder buf = new StringBuilder();
buf.append(buffer.getClass().getSimpleName());
buf.append("@");
buf.append(Integer.toHexString(System.identityHashCode(buffer)));
idString(buffer,buf);
buf.append("[p=");
buf.append(buffer.position());
buf.append(",l=");

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.util;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -31,42 +30,32 @@ import org.eclipse.jetty.util.log.Logger;
/**
* A facility to detect improper usage of resource pools.
* <p>
* Resource pools usually have a method to acquire a pooled resource
* and a method to released it back to the pool.
* Resource pools usually have a method to acquire a pooled resource and a method to released it back to the pool.
* <p>
* To detect if client code acquires a resource but never releases it,
* the resource pool can be modified to use a {@link LeakDetector}.
* The modified resource pool should call {@link #acquired(Object)} every time
* the method to acquire a resource is called, and {@link #released(Object)}
* every time the method to release the resource is called.
* To detect if client code acquires a resource but never releases it, the resource pool can be modified to use a
* {@link LeakDetector}. The modified resource pool should call {@link #acquired(Object)} every time the method to
* acquire a resource is called, and {@link #released(Object)} every time the method to release the resource is called.
* {@link LeakDetector} keeps track of these resources and invokes method
* {@link #leaked(org.eclipse.jetty.util.LeakDetector.LeakInfo)} when it detects that a resource
* has been leaked (that is, acquired but never released).
* {@link #leaked(org.eclipse.jetty.util.LeakDetector.LeakInfo)} when it detects that a resource has been leaked (that
* is, acquired but never released).
* <p>
* To detect whether client code releases a resource without having
* acquired it, the resource pool can be modified to check the return value
* of {@link #released(Object)}: if false, it means that the resource was
* not acquired.
* To detect whether client code releases a resource without having acquired it, the resource pool can be modified to
* check the return value of {@link #released(Object)}: if false, it means that the resource was not acquired.
* <p>
* IMPLEMENTATION NOTES
* <p>
* This class relies on {@link System#identityHashCode(Object)} to create
* a unique id for each resource passed to {@link #acquired(Object)} and
* {@link #released(Object)}. {@link System#identityHashCode(Object)} does
* not guarantee that it will not generate the same number for different
* objects, but in practice the chance of collision is rare.
* This class relies on {@link System#identityHashCode(Object)} to create a unique id for each resource passed to
* {@link #acquired(Object)} and {@link #released(Object)}. {@link System#identityHashCode(Object)} does not guarantee
* that it will not generate the same number for different objects, but in practice the chance of collision is rare.
* <p>
* {@link LeakDetector} uses {@link PhantomReference}s to detect leaks.
* {@link PhantomReference}s are enqueued in their {@link ReferenceQueue}
* <em>after</em> they have been garbage collected (differently from
* {@link WeakReference}s that are enqueued <em>before</em>).
* Since the resource is now garbage collected, {@link LeakDetector} checks
* whether it has been released and if not, it reports a leak.
* Using {@link PhantomReference}s is better than overriding {@link #finalize()}
* and works also in those cases where {@link #finalize()} is not
* overridable.
* {@link LeakDetector} uses {@link PhantomReference}s to detect leaks. {@link PhantomReference}s are enqueued in their
* {@link ReferenceQueue} <em>after</em> they have been garbage collected (differently from {@link WeakReference}s that
* are enqueued <em>before</em>). Since the resource is now garbage collected, {@link LeakDetector} checks whether it
* has been released and if not, it reports a leak. Using {@link PhantomReference}s is better than overriding
* {@link #finalize()} and works also in those cases where {@link #finalize()} is not overridable.
*
* @param <T> the resource type.
* @param <T>
* the resource type.
*/
public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
{
@ -79,33 +68,54 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
/**
* Tracks the resource as been acquired.
*
* @param resource the resource that has been acquired
* @return whether the resource has been tracked
* @param resource
* the resource that has been acquired
* @return true whether the resource has been acquired normally, false if the resource has detected a leak (meaning
* that another acquire occurred before a release of the same resource)
* @see #released(Object)
*/
public boolean acquired(T resource)
{
String id = id(resource);
return resources.putIfAbsent(id, new LeakInfo(resource, id)) == null;
LeakInfo info = resources.putIfAbsent(id,new LeakInfo(resource,id));
if (info != null)
{
// leak detected, prior acquire exists (not released)
LOG.warn("Prior Acquire from Stack",info.getStackFrames());
return false;
}
// normal behavior
return true;
}
/**
* Tracks the resource as been released.
*
* @param resource the resource that has been released
* @return whether the resource has been acquired
* @param resource
* the resource that has been released
* @return true whether the resource has been released normally (based on a previous acquire). false if the resource
* has been released without a prior acquire (such as a double release scenario)
* @see #acquired(Object)
*/
public boolean released(T resource)
{
String id = id(resource);
return resources.remove(id) != null;
LeakInfo info = resources.remove(id);
if (info != null)
{
// normal path
return true;
}
// leak detected (released without acquire)
return false;
}
/**
* Generates a unique ID for the given resource.
*
* @param resource the resource to generate the unique ID for
* @param resource
* the resource to generate the unique ID for
* @return the unique ID of the given resource
*/
protected String id(T resource)
@ -117,7 +127,7 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
protected void doStart() throws Exception
{
super.doStart();
thread = new Thread(this, getClass().getSimpleName());
thread = new Thread(this,getClass().getSimpleName());
thread.setDaemon(true);
thread.start();
}
@ -139,7 +149,7 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
@SuppressWarnings("unchecked")
LeakInfo leakInfo = (LeakInfo)queue.remove();
if (LOG.isDebugEnabled())
LOG.debug("Resource GC'ed: {}", leakInfo);
LOG.debug("Resource GC'ed: {}",leakInfo);
if (resources.remove(leakInfo.id) != null)
leaked(leakInfo);
}
@ -153,11 +163,12 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
/**
* Callback method invoked by {@link LeakDetector} when it detects that a resource has been leaked.
*
* @param leakInfo the information about the leak
* @param leakInfo
* the information about the leak
*/
protected void leaked(LeakInfo leakInfo)
{
LOG.warn("Resource leaked: " + leakInfo.description, leakInfo.stackFrames);
LOG.warn("Resource leaked: " + leakInfo.description,leakInfo.stackFrames);
}
/**
@ -171,7 +182,7 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
private LeakInfo(T referent, String id)
{
super(referent, queue);
super(referent,queue);
this.id = id;
this.description = referent.toString();
this.stackFrames = new Throwable();