Merged branch 'jetty-9.2.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-03-07 23:26:59 +01:00
commit b7715fb3eb
16 changed files with 548 additions and 226 deletions

View File

@ -58,7 +58,7 @@ import org.eclipse.jetty.util.log.Logger;
* {@link HttpContent} may not have content, if the related {@link ContentProvider} is {@code null}, and this
* is reflected by {@link #hasContent()}.
* <p />
* {@link HttpContent} may have {@link DeferredContentProvider deferred content}, in which case {@link #advance()}
* {@link HttpContent} may have {@link AsyncContentProvider deferred content}, in which case {@link #advance()}
* moves the cursor to a position that provides {@code null} {@link #getByteBuffer() buffer} and
* {@link #getContent() content}. When the deferred content is available, a further call to {@link #advance()}
* will move the cursor to a position that provides non {@code null} buffer and content.
@ -124,7 +124,34 @@ public class HttpContent implements Callback, Closeable
*/
public boolean advance()
{
if (isLast())
boolean advanced;
boolean hasNext;
ByteBuffer bytes;
if (iterator instanceof Synchronizable)
{
synchronized (((Synchronizable)iterator).getLock())
{
advanced = iterator.hasNext();
bytes = advanced ? iterator.next() : null;
hasNext = advanced && iterator.hasNext();
}
}
else
{
advanced = iterator.hasNext();
bytes = advanced ? iterator.next() : null;
hasNext = advanced && iterator.hasNext();
}
if (advanced)
{
buffer = bytes;
content = bytes == null ? null : bytes.slice();
if (LOG.isDebugEnabled())
LOG.debug("Advanced content to {} chunk {}", hasNext ? "next" : "last", bytes);
return bytes != null;
}
else
{
if (content != AFTER)
{
@ -134,14 +161,6 @@ public class HttpContent implements Callback, Closeable
}
return false;
}
else
{
ByteBuffer buffer = this.buffer = iterator.next();
if (LOG.isDebugEnabled())
LOG.debug("Advanced content to {} chunk {}", isLast() ? "last" : "next", buffer);
content = buffer == null ? null : buffer.slice();
return buffer != null;
}
}
/**

View File

@ -144,6 +144,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case EXPECTING_WITH_CONTENT:
case PROCEEDING_WITH_CONTENT:
case WAITING:
case COMPLETED:
case FAILED:
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred content available, {}", current);
@ -151,7 +153,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
default:
{
throw illegalSenderState(current);
illegalSenderState(current);
return;
}
}
}
@ -169,8 +172,26 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
SenderState newSenderState = SenderState.SENDING;
if (expects100Continue(request))
newSenderState = content.hasContent() ? SenderState.EXPECTING_WITH_CONTENT : SenderState.EXPECTING;
if (!updateSenderState(SenderState.IDLE, newSenderState))
throw illegalSenderState(SenderState.IDLE);
out: while (true)
{
SenderState current = senderState.get();
switch (current)
{
case IDLE:
case COMPLETED:
{
if (updateSenderState(current, newSenderState))
break out;
break;
}
default:
{
illegalSenderState(current);
return;
}
}
}
// Setting the listener may trigger calls to onContent() by other
// threads so we must set it only after the sender state has been updated
@ -424,16 +445,19 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
protected void reset()
{
HttpContent content = this.content;
this.content = null;
content.close();
content = null;
senderState.set(SenderState.IDLE);
senderState.set(SenderState.COMPLETED);
}
protected void dispose()
{
HttpContent content = this.content;
this.content = null;
if (content != null)
content.close();
senderState.set(SenderState.FAILED);
}
public void proceed(HttpExchange exchange, Throwable failure)
@ -482,16 +506,23 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case WAITING:
{
// We received the 100 Continue, now send the content if any.
if (!updateSenderState(current, SenderState.SENDING))
throw illegalSenderState(current);
if (LOG.isDebugEnabled())
LOG.debug("Proceeding while waiting");
contentCallback.iterate();
if (updateSenderState(current, SenderState.SENDING))
{
if (LOG.isDebugEnabled())
LOG.debug("Proceeding while waiting");
contentCallback.iterate();
return;
}
break;
}
case FAILED:
{
return;
}
default:
{
throw illegalSenderState(current);
illegalSenderState(current);
return;
}
}
}
@ -518,9 +549,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return updated;
}
private RuntimeException illegalSenderState(SenderState current)
private void illegalSenderState(SenderState current)
{
return new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead");
anyToFailure(new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead"));
}
@Override
@ -609,7 +640,15 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
/**
* {@link HttpSender} is sending the headers, while 100 Continue has arrived, and deferred content is available to be sent
*/
PROCEEDING_WITH_CONTENT
PROCEEDING_WITH_CONTENT,
/**
* {@link HttpSender} has finished to send the request
*/
COMPLETED,
/**
* {@link HttpSender} has failed to send the request
*/
FAILED
}
private class CommitCallback implements Callback
@ -619,6 +658,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
try
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.succeeded();
process();
}
@ -631,6 +673,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void failed(Throwable failure)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(failure);
anyToFailure(failure);
}
@ -645,7 +690,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!headersToCommit(request))
return;
HttpContent content = HttpSender.this.content;
HttpContent content = HttpSender.this.content;
if (content == null)
return;
if (!content.hasContent())
{
@ -708,9 +755,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
updateSenderState(current, SenderState.SENDING);
break;
}
case FAILED:
{
return;
}
default:
{
throw illegalSenderState(current);
illegalSenderState(current);
return;
}
}
}
@ -720,16 +772,17 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private class ContentCallback extends IteratingCallback
{
private HttpExchange exchange;
@Override
protected Action process() throws Exception
{
HttpExchange exchange = this.exchange = getHttpExchange();
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return Action.IDLE;
HttpContent content = HttpSender.this.content;
if (content == null)
return Action.IDLE;
while (true)
{
boolean advanced = content.advance();
@ -749,7 +802,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return Action.IDLE;
}
SenderState current = HttpSender.this.senderState.get();
SenderState current = senderState.get();
switch (current)
{
case SENDING:
@ -769,7 +822,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
default:
{
throw illegalSenderState(current);
illegalSenderState(current);
return Action.IDLE;
}
}
}
@ -778,15 +832,24 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void succeeded()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.succeeded();
ByteBuffer buffer = content.getContent();
someToContent(exchange.getRequest(), buffer);
content.succeeded();
super.succeeded();
}
@Override
public void onCompleteFailure(Throwable failure)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(failure);
anyToFailure(failure);
}
@ -804,16 +867,22 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void succeeded()
{
content.succeeded();
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.succeeded();
someToSuccess(exchange);
}
@Override
public void failed(Throwable failure)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(failure);
anyToFailure(failure);
}

View File

@ -25,12 +25,18 @@ import org.eclipse.jetty.util.LeakDetector;
public class LeakTrackingConnectionPool extends ConnectionPool
{
private final LeakDetector<Connection> leakDetector;
private final LeakDetector<Connection> leakDetector = new LeakDetector<Connection>()
{
@Override
protected void leaked(LeakInfo leakInfo)
{
LeakTrackingConnectionPool.this.leaked(leakInfo);
}
};
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester, LeakDetector<Connection> leakDetector)
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, requester);
this.leakDetector = leakDetector;
start();
}
@ -69,13 +75,18 @@ public class LeakTrackingConnectionPool extends ConnectionPool
protected void acquired(Connection connection)
{
if (!leakDetector.acquired(connection))
LOG.info("Connection {}@{} not tracked", connection, System.identityHashCode(connection));
LOG.info("Connection {}@{} not tracked", connection, leakDetector.id(connection));
}
@Override
protected void released(Connection connection)
{
if (!leakDetector.released(connection))
LOG.info("Connection {}@{} released but not acquired", connection, System.identityHashCode(connection));
LOG.info("Connection {}@{} released but not acquired", connection, leakDetector.id(connection));
}
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
LOG.info("Connection " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames());
}
}

View File

@ -0,0 +1,45 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
/**
* <p>Implementations of this interface expose a lock object
* via {@link #getLock()} so that callers can synchronize
* externally on that lock:</p>
* <pre>
* if (iterator instanceof Synchronizable)
* {
* Object element = null;
* synchronized (((Synchronizable)iterator).getLock())
* {
* if (iterator.hasNext())
* element = iterator.next();
* }
* }
* </pre>
* <p>In the example above, the calls to {@code hasNext()} and
* {@code next()} are performed "atomically".</p>
*/
public interface Synchronizable
{
/**
* @return the lock object to synchronize on
*/
public Object getLock();
}

View File

@ -27,11 +27,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.AsyncContentProvider;
import org.eclipse.jetty.client.Synchronizable;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@ -90,7 +90,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
private static final Chunk CLOSE = new Chunk(BufferUtil.EMPTY_BUFFER, Callback.Adapter.INSTANCE);
private final Object lock = this;
private final Queue<Chunk> chunks = new ArrayQueue<>(4, 64, lock);
private final ArrayQueue<Chunk> chunks = new ArrayQueue<>(4, 64, lock);
private final AtomicReference<Listener> listener = new AtomicReference<>();
private final DeferredContentProviderIterator iterator = new DeferredContentProviderIterator();
private final AtomicBoolean closed = new AtomicBoolean();
@ -241,7 +241,7 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
return iterator;
}
private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback
private class DeferredContentProviderIterator implements Iterator<ByteBuffer>, Callback, Synchronizable
{
private Chunk current;
@ -261,7 +261,12 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
{
Chunk chunk = current = chunks.poll();
if (chunk == CLOSE)
{
// Slow path: reinsert the CLOSE chunk
// so that hasNext() works correctly.
chunks.add(0, CLOSE);
throw new NoSuchElementException();
}
return chunk == null ? null : chunk.buffer;
}
}
@ -308,6 +313,12 @@ public class DeferredContentProvider implements AsyncContentProvider, Callback,
for (Chunk chunk : chunks)
chunk.callback.failed(x);
}
@Override
public Object getLock()
{
return lock;
}
}
public static class Chunk

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.client;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -31,7 +28,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -62,6 +59,9 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class HttpClientLoadTest extends AbstractHttpClientServerTest
{
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
@ -76,6 +76,8 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
{
int cores = Runtime.getRuntime().availableProcessors();
final AtomicLong connectionLeaks = new AtomicLong();
start(new LoadHandler());
server.stop();
server.removeConnector(connector);
@ -88,8 +90,6 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
client.stop();
final LeakDetector<Connection> connectionLeakDetector = new LeakDetector<Connection>();
HttpClient newClient = new HttpClient(new HttpClientTransportOverHTTP()
{
@Override
@ -100,7 +100,14 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this, connectionLeakDetector);
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo resource)
{
connectionLeaks.incrementAndGet();
}
};
}
};
}
@ -131,15 +138,17 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
run(random, iterations);
}
System.gc();
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), is(0L));
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), is(0L));
assertThat("Connection Leaks", connectionLeakDetector.getUnreleasedCount(), is(0L));
assertThat("Connection Leaks", connectionLeaks.get(), is(0L));
}
private void run(Random random, int iterations) throws InterruptedException

View File

@ -0,0 +1,156 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.Callback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class DeferredContentProviderTest
{
private ExecutorService executor;
@Before
public void prepare() throws Exception
{
executor = Executors.newCachedThreadPool();
}
@After
public void dispose() throws Exception
{
executor.shutdownNow();
}
@Test
public void testWhenEmptyFlushDoesNotBlock() throws Exception
{
final DeferredContentProvider provider = new DeferredContentProvider();
Future<?> task = executor.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
provider.flush();
return null;
}
});
Assert.assertTrue(await(task, 5, TimeUnit.SECONDS));
}
@Test
public void testOfferFlushBlocksUntilSucceeded() throws Exception
{
final DeferredContentProvider provider = new DeferredContentProvider();
Iterator<ByteBuffer> iterator = provider.iterator();
provider.offer(ByteBuffer.allocate(0));
Future<?> task = executor.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
provider.flush();
return null;
}
});
// Wait until flush() blocks.
Assert.assertFalse(await(task, 1, TimeUnit.SECONDS));
// Consume the content and succeed the callback.
iterator.next();
((Callback)iterator).succeeded();
// Flush should return.
Assert.assertTrue(await(task, 5, TimeUnit.SECONDS));
}
@Test
public void testCloseFlushDoesNotBlock() throws Exception
{
final DeferredContentProvider provider = new DeferredContentProvider();
provider.close();
Future<?> task = executor.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
provider.flush();
return null;
}
});
// Wait until flush() blocks.
Assert.assertTrue(await(task, 5, TimeUnit.SECONDS));
}
@Test
public void testCloseNextHasNextReturnsFalse() throws Exception
{
DeferredContentProvider provider = new DeferredContentProvider();
Iterator<ByteBuffer> iterator = provider.iterator();
provider.close();
Assert.assertFalse(iterator.hasNext());
try
{
iterator.next();
Assert.fail();
}
catch (NoSuchElementException x)
{
// Expected
}
Assert.assertFalse(iterator.hasNext());
}
private boolean await(Future<?> task, long time, TimeUnit unit) throws Exception
{
try
{
task.get(time, unit);
return true;
}
catch (TimeoutException x)
{
return false;
}
}
}

View File

@ -18,15 +18,13 @@
package org.eclipse.jetty.fcgi.server;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.fcgi.client.http.HttpDestinationOverFCGI;
import org.eclipse.jetty.http.HttpScheme;
@ -42,13 +40,16 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Rule;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public abstract class AbstractHttpClientServerTest
{
@Rule
public final TestTracker tracker = new TestTracker();
private LeakTrackingByteBufferPool serverBufferPool;
private LeakTrackingByteBufferPool clientBufferPool;
private LeakDetector<Connection> connectionLeakDetector;
private final AtomicLong connectionLeaks = new AtomicLong();
protected Server server;
protected ServerConnector connector;
protected HttpClient client;
@ -71,8 +72,6 @@ public abstract class AbstractHttpClientServerTest
QueuedThreadPool executor = new QueuedThreadPool();
executor.setName(executor.getName() + "-client");
connectionLeakDetector = new LeakDetector<Connection>();
client = new HttpClient(new HttpClientTransportOverFCGI(1, false, "")
{
@Override
@ -83,7 +82,14 @@ public abstract class AbstractHttpClientServerTest
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this, connectionLeakDetector);
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
connectionLeaks.incrementAndGet();
}
};
}
};
}
@ -98,15 +104,16 @@ public abstract class AbstractHttpClientServerTest
public void dispose() throws Exception
{
System.gc();
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), is(0L));
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedUnreleased(), is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), is(0L));
assertThat("Connection Leaks", connectionLeakDetector.getUnreleasedCount(), is(0L));
assertThat("Connection Leaks", connectionLeaks.get(), is(0L));
if (client != null)
client.stop();

View File

@ -33,16 +33,24 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
private final LeakDetector<ByteBuffer> leakDetector = new LeakDetector<ByteBuffer>()
{
public String id(ByteBuffer resource)
public String id(ByteBuffer resource)
{
return BufferUtil.toIDString(resource);
}
@Override
protected void leaked(LeakInfo leakInfo)
{
leaked.incrementAndGet();
LeakTrackingByteBufferPool.this.leaked(leakInfo);
}
};
private final static boolean NOISY = Boolean.getBoolean(LeakTrackingByteBufferPool.class.getName() + ".NOISY");
private final ByteBufferPool delegate;
private final AtomicLong leakedReleases = new AtomicLong(0);
private final AtomicLong leakedAcquires = new AtomicLong(0);
private final AtomicLong leaked = new AtomicLong(0);
public LeakTrackingByteBufferPool(ByteBufferPool delegate)
{
@ -54,12 +62,12 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
@Override
public ByteBuffer acquire(int size, boolean direct)
{
ByteBuffer buffer = delegate.acquire(size,direct);
ByteBuffer buffer = delegate.acquire(size, direct);
boolean leaked = leakDetector.acquired(buffer);
if (NOISY || !leaked)
{
leakedAcquires.incrementAndGet();
LOG.info(String.format("ByteBuffer acquire %s leaked.acquired=%s",leakDetector.id(buffer),leaked ? "normal" : "LEAK"),
LOG.info(String.format("ByteBuffer acquire %s leaked.acquired=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"),
new Throwable("LeakStack.Acquire"));
}
return buffer;
@ -71,46 +79,47 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
if (buffer == null)
return;
boolean leaked = leakDetector.released(buffer);
if (NOISY || !leaked) {
if (NOISY || !leaked)
{
leakedReleases.incrementAndGet();
LOG.info(String.format("ByteBuffer release %s leaked.released=%s",leakDetector.id(buffer),leaked ? "normal" : "LEAK"),new Throwable(
LOG.info(String.format("ByteBuffer release %s leaked.released=%s", leakDetector.id(buffer), leaked ? "normal" : "LEAK"), new Throwable(
"LeakStack.Release"));
}
delegate.release(buffer);
}
public void clearTracking()
{
leakDetector.clear();
leakedAcquires.set(0);
leakedReleases.set(0);
}
/**
* Get the count of BufferPool.acquire() calls that detected a leak
* @return count of BufferPool.acquire() calls that detected a leak
*/
public long getLeakedAcquires()
{
return leakedAcquires.get();
}
/**
* Get the count of BufferPool.release() calls that detected a leak
* @return count of BufferPool.release() calls that detected a leak
*/
public long getLeakedReleases()
{
return leakedReleases.get();
}
/**
* At the end of the run, when the LeakDetector runs, this reports the
* number of unreleased resources.
* @return count of resources that were acquired but not released (byt the end of the run)
* @return count of resources that were acquired but not released
*/
public long getLeakedUnreleased()
public long getLeakedResources()
{
return leakDetector.getUnreleasedCount();
return leaked.get();
}
protected void leaked(LeakDetector<ByteBuffer>.LeakInfo leakInfo)
{
LOG.warn("ByteBuffer " + leakInfo.getResourceDescription() + " leaked at:", leakInfo.getStackFrames());
}
}

View File

@ -119,23 +119,23 @@ public class MappedByteBufferPool implements ByteBufferPool
{
return direct ? directBuffers : heapBuffers;
}
private static AtomicInteger __tag = new AtomicInteger();
public static class Tagged extends MappedByteBufferPool
{
private final AtomicInteger tag = new AtomicInteger();
public ByteBuffer createIndirect(int capacity)
{
ByteBuffer buffer = BufferUtil.allocate(capacity+4);
ByteBuffer buffer = BufferUtil.allocate(capacity + 4);
buffer.limit(4);
buffer.putInt(0,__tag.incrementAndGet());
buffer.putInt(0, tag.incrementAndGet());
buffer.position(4);
buffer.limit(buffer.capacity());
ByteBuffer slice = buffer.slice();
BufferUtil.clear(slice);
return slice;
}
protected ByteBuffer createDirect(int capacity)
{
return createIndirect(capacity);

View File

@ -390,7 +390,7 @@ public abstract class AbstractProxyServlet extends HttpServlet
clientRequest.getHeader(HttpHeader.TRANSFER_ENCODING.asString()) != null;
}
protected void copyHeaders(HttpServletRequest clientRequest, Request proxyRequest)
protected void copyRequestHeaders(HttpServletRequest clientRequest, Request proxyRequest)
{
// First clear possibly existing headers, as we are going to copy those from the client request.
proxyRequest.getHeaders().clear();

View File

@ -28,7 +28,6 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
@ -82,7 +81,7 @@ public class AsyncMiddleManServlet extends AbstractProxyServlet
boolean hasContent = hasContent(clientRequest);
copyHeaders(clientRequest, proxyRequest);
copyRequestHeaders(clientRequest, proxyRequest);
addProxyHeaders(clientRequest, proxyRequest);

View File

@ -108,6 +108,15 @@ public class ProxyServlet extends AbstractProxyServlet
sendProxyRequest(request, response, proxyRequest);
}
/**
* @deprecated use {@link #copyRequestHeaders(HttpServletRequest, Request)} instead
*/
@Deprecated
protected void copyHeaders(HttpServletRequest clientRequest, Request proxyRequest)
{
copyRequestHeaders(clientRequest, proxyRequest);
}
protected ContentProvider proxyRequestContent(final Request proxyRequest, final HttpServletRequest request) throws IOException
{
return new ProxyInputStreamContentProvider(proxyRequest, request, request.getInputStream());

View File

@ -19,15 +19,11 @@
package org.eclipse.jetty.proxy;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -38,8 +34,6 @@ import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.proxy.AbstractProxyServlet;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
@ -50,36 +44,37 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
public class Async502Loop
@RunWith(Parameterized.class)
public class AsyncProxyServletLoadTest
{
private static final Logger LOG = Log.getLogger(Async502Loop.class);
private static final String PROXIED_HEADER = "X-Proxied";
public static void main(String[] args)
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> data()
{
try
{
new Async502Loop().loop();
}
catch (Throwable t)
{
t.printStackTrace(System.err);
}
return Arrays.asList(new Object[][]{
{AsyncProxyServlet.class},
{AsyncMiddleManServlet.class}
});
}
private static final Logger LOG = Log.getLogger(AsyncProxyServletLoadTest.class);
private static final String PROXIED_HEADER = "X-Proxied";
private HttpClient client;
private Server proxy;
private ServerConnector proxyConnector;
private ServletContextHandler proxyContext;
private AbstractProxyServlet proxyServlet;
private Server server;
private ServerConnector serverConnector;
public Async502Loop()
public AsyncProxyServletLoadTest(Class<?> proxyServletClass) throws Exception
{
proxyServlet = new AsyncProxyServlet();
// proxyServlet = new AsyncMiddleManServlet();
proxyServlet = (AbstractProxyServlet)proxyServletClass.newInstance();
}
private void startServer(HttpServlet servlet) throws Exception
@ -90,19 +85,14 @@ public class Async502Loop
serverConnector = new ServerConnector(server);
server.addConnector(serverConnector);
ServletContextHandler appCtx = new ServletContextHandler(server,"/",true,false);
ServletContextHandler appCtx = new ServletContextHandler(server, "/", true, false);
ServletHolder appServletHolder = new ServletHolder(servlet);
appCtx.addServlet(appServletHolder,"/*");
appCtx.addServlet(appServletHolder, "/*");
server.start();
}
private void startProxy() throws Exception
{
startProxy(new HashMap<String, String>());
}
private void startProxy(Map<String, String> initParams) throws Exception
{
QueuedThreadPool proxyPool = new QueuedThreadPool();
proxyPool.setName("proxy");
@ -111,36 +101,28 @@ public class Async502Loop
HttpConfiguration configuration = new HttpConfiguration();
configuration.setSendDateHeader(false);
configuration.setSendServerVersion(false);
String value = initParams.get("outputBufferSize");
if (value != null)
configuration.setOutputBufferSize(Integer.valueOf(value));
proxyConnector = new ServerConnector(proxy,new HttpConnectionFactory(configuration));
proxyConnector = new ServerConnector(proxy, new HttpConnectionFactory(configuration));
proxy.addConnector(proxyConnector);
proxyContext = new ServletContextHandler(proxy,"/",true,false);
ServletContextHandler proxyContext = new ServletContextHandler(proxy, "/", true, false);
ServletHolder proxyServletHolder = new ServletHolder(proxyServlet);
proxyServletHolder.setInitParameters(initParams);
proxyContext.addServlet(proxyServletHolder,"/*");
proxyContext.addServlet(proxyServletHolder, "/*");
proxy.start();
}
private void startClient() throws Exception
{
client = prepareClient();
}
private HttpClient prepareClient() throws Exception
{
QueuedThreadPool clientPool = new QueuedThreadPool();
clientPool.setName("client");
HttpClient result = new HttpClient();
result.setExecutor(clientPool);
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost",proxyConnector.getLocalPort()));
result.getProxyConfiguration().getProxies().add(new HttpProxy("localhost", proxyConnector.getLocalPort()));
result.start();
return result;
client = result;
}
@After
public void dispose() throws Exception
{
client.stop();
@ -148,60 +130,8 @@ public class Async502Loop
server.stop();
}
private static class ClientLoop implements Runnable
{
private final CountDownLatch active;
private final AtomicBoolean ok;
private final HttpClient client;
private final String host;
private final int port;
public ClientLoop(CountDownLatch activeClientLatch, AtomicBoolean atomicOk, HttpClient client, String serverHost, int serverPort)
{
this.active = activeClientLatch;
this.ok = atomicOk;
this.client = client;
this.host = serverHost;
this.port = serverPort;
}
@Override
public void run()
{
String threadName = Thread.currentThread().getName();
LOG.info("Starting thread {}",threadName);
try
{
while (ok.get())
{
byte[] content = new byte[1024];
new Random().nextBytes(content);
ContentResponse response = client.newRequest(host,port).method(HttpMethod.POST).content(new BytesContentProvider(content))
.timeout(5,TimeUnit.SECONDS).send();
if (response.getStatus() != 200)
{
LOG.warn("Got response <{}>, expecting <{}>",response.getStatus(),200);
// allow all ClientLoops to finish
ok.set(false);
}
}
}
catch (InterruptedException | TimeoutException | ExecutionException e)
{
LOG.warn("Error processing request",e);
ok.set(false);
}
finally
{
LOG.info("Shutting down thread {}",threadName);
active.countDown();
}
}
}
@SuppressWarnings("serial")
private void loop() throws Exception
@Test
public void test() throws Exception
{
startServer(new HttpServlet()
{
@ -209,33 +139,98 @@ public class Async502Loop
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
if (req.getHeader("Via") != null)
resp.addHeader(PROXIED_HEADER,"true");
IO.copy(req.getInputStream(),resp.getOutputStream());
resp.addHeader(PROXIED_HEADER, "true");
IO.copy(req.getInputStream(), resp.getOutputStream());
}
});
startProxy();
startClient();
// Number of clients to simulate
int clientCount = 5;
int clientCount = Runtime.getRuntime().availableProcessors();
// Latch for number of clients still active (used to terminate test)
final CountDownLatch activeClientLatch = new CountDownLatch(clientCount);
// Atomic Boolean to track that its OK to still continue looping.
// When this goes false, that means one of the client threads has
// encountered an error condition, and should allow all remaining
// client threads to finish cleanly.
final AtomicBoolean atomicOk = new AtomicBoolean(true);
final AtomicBoolean success = new AtomicBoolean(true);
int iterations = 1000;
// Start clients
for (int i = 0; i < clientCount; i++)
{
ClientLoop r = new ClientLoop(activeClientLatch,atomicOk,client,"localhost",serverConnector.getLocalPort());
ClientLoop r = new ClientLoop(activeClientLatch, success, client, "localhost", serverConnector.getLocalPort(), iterations);
String name = "client-" + i;
Thread thread = new Thread(r,name);
Thread thread = new Thread(r, name);
thread.start();
}
activeClientLatch.await();
dispose();
Assert.assertTrue(activeClientLatch.await(clientCount * iterations * 10, TimeUnit.MILLISECONDS));
Assert.assertTrue(success.get());
}
private static class ClientLoop implements Runnable
{
private final CountDownLatch active;
private final AtomicBoolean success;
private final HttpClient client;
private final String host;
private final int port;
private int iterations;
public ClientLoop(CountDownLatch activeClientLatch, AtomicBoolean success, HttpClient client, String serverHost, int serverPort, int iterations)
{
this.active = activeClientLatch;
this.success = success;
this.client = client;
this.host = serverHost;
this.port = serverPort;
this.iterations = iterations;
}
@Override
public void run()
{
String threadName = Thread.currentThread().getName();
LOG.info("Starting thread {}", threadName);
try
{
while (success.get())
{
--iterations;
byte[] content = new byte[1024];
new Random().nextBytes(content);
ContentResponse response = client.newRequest(host, port).method(HttpMethod.POST).content(new BytesContentProvider(content))
.timeout(5, TimeUnit.SECONDS).send();
if (response.getStatus() != 200)
{
LOG.warn("Got response <{}>, expecting <{}>", response.getStatus(), 200);
// allow all ClientLoops to finish
success.set(false);
}
else
{
if (iterations == 0)
break;
}
}
}
catch (Throwable x)
{
LOG.warn("Error processing request", x);
success.set(false);
}
finally
{
LOG.info("Shutting down thread {}", threadName);
active.countDown();
}
}
}
}

View File

@ -20,9 +20,9 @@ package org.eclipse.jetty.util;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
@ -55,8 +55,7 @@ import org.eclipse.jetty.util.log.Logger;
* has been released and if not, it reports a leak. Using {@link PhantomReference}s is better than overriding
* {@link #finalize()} and works also in those cases where {@link #finalize()} is not overridable.
*
* @param <T>
* the resource type.
* @param <T> the resource type.
*/
public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
{
@ -64,14 +63,12 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
private final ReferenceQueue<T> queue = new ReferenceQueue<>();
private final ConcurrentMap<String, LeakInfo> resources = new ConcurrentHashMap<>();
private final AtomicLong unreleasedCount = new AtomicLong(0);
private Thread thread;
/**
* Tracks the resource as been acquired.
*
* @param resource
* the resource that has been acquired
* @param resource the resource that has been acquired
* @return true whether the resource has been acquired normally, false if the resource has detected a leak (meaning
* that another acquire occurred before a release of the same resource)
* @see #released(Object)
@ -79,21 +76,20 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
public boolean acquired(T resource)
{
String id = id(resource);
LeakInfo info = resources.putIfAbsent(id,new LeakInfo(resource,id));
LeakInfo info = resources.putIfAbsent(id, new LeakInfo(resource,id));
if (info != null)
{
// leak detected, prior acquire exists (not released)
// Leak detected, prior acquire exists (not released) or id clash.
return false;
}
// normal behavior
// Normal behavior.
return true;
}
/**
* Tracks the resource as been released.
*
* @param resource
* the resource that has been released
* @param resource the resource that has been released
* @return true whether the resource has been released normally (based on a previous acquire). false if the resource
* has been released without a prior acquire (such as a double release scenario)
* @see #acquired(Object)
@ -104,19 +100,18 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
LeakInfo info = resources.remove(id);
if (info != null)
{
// normal behavior
// Normal behavior.
return true;
}
// leak detected (released without acquire)
// Leak detected (released without acquire).
return false;
}
/**
* Generates a unique ID for the given resource.
*
* @param resource
* the resource to generate the unique ID for
* @param resource the resource to generate the unique ID for
* @return the unique ID of the given resource
*/
public String id(T resource)
@ -164,23 +159,11 @@ public class LeakDetector<T> extends AbstractLifeCycle implements Runnable
/**
* Callback method invoked by {@link LeakDetector} when it detects that a resource has been leaked.
*
* @param leakInfo
* the information about the leak
* @param leakInfo the information about the leak
*/
protected void leaked(LeakInfo leakInfo)
{
LOG.warn("Resource leaked: " + leakInfo.description,leakInfo.stackFrames);
unreleasedCount.incrementAndGet();
}
public void clear()
{
unreleasedCount.set(0);
}
public long getUnreleasedCount()
{
return unreleasedCount.get();
}
/**

View File

@ -18,15 +18,15 @@
package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class LeakTrackingBufferPoolRule extends LeakTrackingByteBufferPool implements TestRule
{
private final String id;
@ -41,7 +41,7 @@ public class LeakTrackingBufferPoolRule extends LeakTrackingByteBufferPool imple
{
assertThat("Leaked Acquires Count for [" + id + "]",getLeakedAcquires(),is(0L));
assertThat("Leaked Releases Count for [" + id + "]",getLeakedReleases(),is(0L));
assertThat("Leaked Unrelesed Count for [" + id + "]",getLeakedUnreleased(),is(0L));
assertThat("Leaked Resource Count for [" + id + "]", getLeakedResources(),is(0L));
}
@Override