425043 - Track whether pools are used correctly.

Added leak detection to SPDY load test.
This commit is contained in:
Simone Bordet 2014-01-08 16:17:08 +01:00
parent fb8d55e46e
commit dbee351543
4 changed files with 105 additions and 39 deletions

View File

@ -258,9 +258,9 @@ public class SPDYClient
public static class Factory extends ContainerLifeCycle
{
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new MappedByteBufferPool();
private final Scheduler scheduler;
private final Executor executor;
private final ByteBufferPool bufferPool;
private final SslContextFactory sslContextFactory;
private final SelectorManager selector;
private final long idleTimeout;
@ -292,6 +292,11 @@ public class SPDYClient
}
public Factory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory, long idleTimeout)
{
this(executor, scheduler, null, sslContextFactory, idleTimeout);
}
public Factory(Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, SslContextFactory sslContextFactory, long idleTimeout)
{
this.idleTimeout = idleTimeout;
setConnectTimeout(15000);
@ -306,6 +311,11 @@ public class SPDYClient
this.scheduler = scheduler;
addBean(scheduler);
if (bufferPool == null)
bufferPool = new MappedByteBufferPool();
this.bufferPool = bufferPool;
addBean(bufferPool);
this.sslContextFactory = sslContextFactory;
if (sslContextFactory != null)
addBean(sslContextFactory);

View File

@ -24,6 +24,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -56,7 +57,7 @@ public abstract class AbstractTest
protected Server server;
protected SPDYClient.Factory clientFactory;
protected SPDYServerConnector connector;
protected ServerConnector connector;
protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
{
@ -93,7 +94,7 @@ public abstract class AbstractTest
return new Server(pool);
}
protected SPDYServerConnector newSPDYServerConnector(Server server, ServerSessionFrameListener listener)
protected ServerConnector newSPDYServerConnector(Server server, ServerSessionFrameListener listener)
{
return new SPDYServerConnector(server, listener);
}
@ -110,8 +111,8 @@ public abstract class AbstractTest
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName(threadPool.getName() + "-client");
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
}
clientFactory.start();
return clientFactory.newSPDYClient(version).connect(socketAddress, listener);
}

View File

@ -41,7 +41,6 @@ import org.junit.Test;
public class IdleTimeoutTest extends AbstractTest
{
private final int idleTimeout = 1000;
@Test
@ -170,7 +169,6 @@ public class IdleTimeoutTest extends AbstractTest
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName(threadPool.getName() + "-client");
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient(SPDY.V2);
client.setIdleTimeout(idleTimeout);
Session session = client.connect(address, null);
@ -196,7 +194,6 @@ public class IdleTimeoutTest extends AbstractTest
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName(threadPool.getName() + "-client");
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient(SPDY.V2);
client.setIdleTimeout(idleTimeout);
Session session = client.connect(address, null);
@ -229,7 +226,6 @@ public class IdleTimeoutTest extends AbstractTest
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName(threadPool.getName() + "-client");
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient(SPDY.V2);
client.setIdleTimeout(idleTimeout);
Session session = client.connect(address, null);

View File

@ -31,28 +31,61 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.client.SPDYClient;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.Assert;
import org.junit.Test;
public class SynDataReplyDataLoadTest extends AbstractTest
{
@Test(timeout=60000)
private static final int TIMEOUT = 60000;
private static final Logger logger = Log.getLogger(SynDataReplyDataLoadTest.class);
@Test(timeout = TIMEOUT)
public void testSynDataReplyDataLoad() throws Exception
{
ServerSessionFrameListener serverSessionFrameListener = new ServerSessionFrameListener.Adapter()
final AtomicLong leaks = new AtomicLong();
LeakTrackingByteBufferPool serverBufferPool = new LeakTrackingByteBufferPool(new ArrayByteBufferPool())
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
leaks.incrementAndGet();
}
};
LeakTrackingByteBufferPool clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool())
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
leaks.incrementAndGet();
}
};
ServerSessionFrameListener listener = new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
@ -69,7 +102,27 @@ public class SynDataReplyDataLoadTest extends AbstractTest
};
}
};
final Session session = startClient(startServer(serverSessionFrameListener), null);
server = newServer();
connector = new ServerConnector(server, null, null, serverBufferPool, 1,
Runtime.getRuntime().availableProcessors() / 2, new SPDYServerConnectionFactory(SPDY.V3, listener));
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName(clientExecutor.getName() + "-client");
clientFactory = new SPDYClient.Factory(clientExecutor, null, clientBufferPool, null, 30000);
final Session session = startClient(SPDY.V3, startServer(SPDY.V3, listener), null);
final Thread testThread = Thread.currentThread();
Runnable timeout = new Runnable()
{
@Override
public void run()
{
logger.warn("Interrupting test, it is taking too long");
logger.warn("SERVER: {}", server.dump());
logger.warn("CLIENT: {}", clientFactory.dump());
testThread.interrupt();
}
};
final int iterations = 500;
final int count = 50;
@ -107,6 +160,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
}
});
}
Scheduler.Task timeoutTask = clientFactory.getScheduler().schedule(timeout, TIMEOUT / 2, TimeUnit.MILLISECONDS);
{
long begin = System.nanoTime();
List<Future<Object>> futures = threadPool.invokeAll(tasks);
@ -116,6 +170,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
long end = System.nanoTime();
System.err.printf("SYN+GET+DATA+GET completed in %d ms%n", TimeUnit.NANOSECONDS.toMillis(end - begin));
}
timeoutTask.cancel();
tasks.clear();
for (int i = 0; i < count; ++i)
@ -130,6 +185,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
}
});
}
timeoutTask = clientFactory.getScheduler().schedule(timeout, TIMEOUT / 2, TimeUnit.MILLISECONDS);
{
long begin = System.nanoTime();
List<Future<Object>> futures = threadPool.invokeAll(tasks);
@ -139,8 +195,10 @@ public class SynDataReplyDataLoadTest extends AbstractTest
long end = System.nanoTime();
System.err.printf("SYN+COMPLETED+DATA completed in %d ms%n", TimeUnit.NANOSECONDS.toMillis(end - begin));
}
timeoutTask.cancel();
threadPool.shutdown();
Assert.assertEquals(0, leaks.get());
}
private void synCompletedData(Session session, Fields headers, int iterations) throws Exception
@ -175,13 +233,14 @@ public class SynDataReplyDataLoadTest extends AbstractTest
}
}, new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream stream)
{
stream.data(new StringDataInfo("data_" + stream.getId(), true),
new Callback.Adapter());
}
});
@Override
public void succeeded(Stream stream)
{
stream.data(new StringDataInfo("data_" + stream.getId(), true),
new Callback.Adapter());
}
}
);
}
Assert.assertTrue(latch.await(iterations, TimeUnit.SECONDS));
Assert.assertTrue(counter.toString(), counter.isEmpty());
@ -198,27 +257,27 @@ public class SynDataReplyDataLoadTest extends AbstractTest
counter.put(index, index);
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, false, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Assert.assertEquals(2, count.getAndDecrement());
latch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
// TCP can split the data frames, so I may be receiving more than 1 data frame
dataInfo.asBytes(true);
if (dataInfo.isClose())
{
Assert.assertEquals(1, count.getAndDecrement());
counter.remove(index);
latch.countDown();
}
}
});
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Assert.assertEquals(2, count.getAndDecrement());
latch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
// TCP can split the data frames, so I may be receiving more than 1 data frame
dataInfo.asBytes(true);
if (dataInfo.isClose())
{
Assert.assertEquals(1, count.getAndDecrement());
counter.remove(index);
latch.countDown();
}
}
});
stream.data(new StringDataInfo(5, TimeUnit.SECONDS, "data_" + stream.getId(), true));
}
Assert.assertTrue(latch.await(iterations, TimeUnit.SECONDS));