Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9

This commit is contained in:
Jesse McConnell 2012-07-20 15:38:57 -05:00
commit f8ca13c3b3
16 changed files with 297 additions and 105 deletions

View File

@ -22,9 +22,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class BlockingArrayQueueTest
{
@ -138,6 +141,7 @@ public class BlockingArrayQueueTest
}
@Test
@Slow
public void testTake() throws Exception
{
final String[] data=new String[4];
@ -184,6 +188,7 @@ public class BlockingArrayQueueTest
volatile boolean _running;
@Test
@Slow
public void testConcurrentAccess() throws Exception
{
final int THREADS=50;

View File

@ -20,17 +20,21 @@ import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.Test;
import org.junit.runner.RunWith;
/* ------------------------------------------------------------ */
/** Util meta Tests.
*
/**
* Util meta Tests.
*/
@RunWith(AdvancedRunner.class)
public class DateCacheTest
{
/* ------------------------------------------------------------ */
@Test
@Slow
public void testDateCache() throws Exception
{
//@WAS: Test t = new Test("org.eclipse.jetty.util.DateCache");

View File

@ -1,4 +1,3 @@
package org.eclipse.jetty.util;
//========================================================================
//Copyright (c) 2006-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
@ -11,6 +10,7 @@ package org.eclipse.jetty.util;
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.util;
import java.io.File;
import java.io.FileOutputStream;
@ -19,16 +19,20 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.FS;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Scanner.Notification;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class ScannerTest
{
static File _directory;
@ -97,6 +101,7 @@ public class ScannerTest
}
@Test
@Slow
public void testAddedChangeRemove() throws Exception
{
// TODO needs to be further investigated

View File

@ -21,9 +21,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class QueuedThreadPoolTest
{
final AtomicInteger _jobs=new AtomicInteger();
@ -62,6 +65,7 @@ public class QueuedThreadPoolTest
@Test
@Slow
public void testThreadPool() throws Exception
{
QueuedThreadPool tp= new QueuedThreadPool();
@ -137,6 +141,7 @@ public class QueuedThreadPoolTest
}
@Test
@Slow
public void testShrink() throws Exception
{
final AtomicInteger sleep = new AtomicInteger(100);

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<launchConfiguration type="org.eclipse.jdt.junit.launchconfig">
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_PATHS">
<listEntry value="/websocket-core/src/test/java/org/eclipse/jetty/websocket/AllTests.java"/>
</listAttribute>
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_TYPES">
<listEntry value="1"/>
</listAttribute>
<stringAttribute key="org.eclipse.jdt.junit.CONTAINER" value=""/>
<booleanAttribute key="org.eclipse.jdt.junit.KEEPRUNNING_ATTR" value="false"/>
<stringAttribute key="org.eclipse.jdt.junit.TESTNAME" value=""/>
<stringAttribute key="org.eclipse.jdt.junit.TEST_KIND" value="org.eclipse.jdt.junit.loader.junit4"/>
<stringAttribute key="org.eclipse.jdt.launching.CLASSPATH_PROVIDER" value="org.eclipse.m2e.launchconfig.classpathProvider"/>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="org.eclipse.jetty.websocket.AllTests"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="websocket-core"/>
<stringAttribute key="org.eclipse.jdt.launching.SOURCE_PATH_PROVIDER" value="org.eclipse.m2e.launchconfig.sourcepathProvider"/>
</launchConfiguration>

View File

@ -67,10 +67,7 @@ public abstract class FrameBytes<C> implements Callback<C>, Runnable
@Override
public void failed(C context, Throwable x)
{
if (LOG.isDebugEnabled())
{
LOG.debug("failed({},{})",context,x);
}
LOG.warn("failed(" + context + ")",x);
cancelTask();
callback.failed(context,x);
}

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
@ -33,6 +34,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -68,6 +70,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
private final FrameQueue queue;
private List<ExtensionConfig> extensions;
private boolean flushing;
private AtomicLong writes;
public WebSocketAsyncConnection(AsyncEndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
@ -79,6 +82,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
this.scheduler = scheduler;
this.extensions = new ArrayList<>();
this.queue = new FrameQueue();
this.writes = new AtomicLong(0);
}
@Override
@ -97,7 +101,10 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
{
synchronized (queue)
{
LOG.debug("Completed Write of {} ({} frame(s) in queue)",frameBytes,queue.size());
if (LOG.isDebugEnabled())
{
LOG.debug("Completed Write of {} ({} frame(s) in queue)",frameBytes,queue.size());
}
flushing = false;
}
}
@ -117,20 +124,6 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
}
}
private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
{
try
{
return endPoint.fill(buffer);
}
catch (IOException e)
{
LOG.warn(e);
terminateConnection(StatusCode.PROTOCOL,e.getMessage());
return -1;
}
}
public void flush()
{
FrameBytes<?> frameBytes = null;
@ -257,39 +250,52 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
if (frame.getOpCode().isControlFrame())
synchronized (queue)
{
ControlFrameBytes<C> bytes = new ControlFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.prepend(bytes);
}
else
{
DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.append(bytes);
if (frame.getOpCode().isControlFrame())
{
ControlFrameBytes<C> bytes = new ControlFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.prepend(bytes);
}
else
{
DataFrameBytes<C> bytes = new DataFrameBytes<C>(this,callback,context,frame);
scheduleTimeout(bytes);
queue.append(bytes);
}
}
flush();
}
private void read(ByteBuffer buffer)
{
while (true)
try
{
int filled = fill(getEndPoint(),buffer);
if (filled == 0)
while (true)
{
break;
}
if (filled < 0)
{
// IO error
terminateConnection(StatusCode.PROTOCOL,null);
break;
}
int filled = getEndPoint().fill(buffer);
if (filled == 0)
{
break;
}
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
parser.parse(buffer);
if (LOG.isDebugEnabled())
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
parser.parse(buffer);
}
}
catch (IOException e)
{
LOG.warn(e);
terminateConnection(StatusCode.PROTOCOL,e.getMessage());
}
catch (CloseException e)
{
LOG.warn(e);
terminateConnection(e.getStatusCode(),e.getMessage());
}
}
@ -350,7 +356,11 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
try
{
endpoint.write(frameBytes.context,frameBytes,buffer);
// endpoint.flush();
long count = writes.incrementAndGet();
if ((count % 10) == 0)
{
LOG.info("Server wrote {} ByteBuffers",count);
}
}
catch (Throwable t)
{

View File

@ -180,8 +180,6 @@ public class Parser
}
try
{
LOG.debug("Parsing {} bytes",buffer.remaining());
// parse through all the frames in the buffer
while (parseFrame(buffer))
{

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<launchConfiguration type="org.eclipse.jdt.junit.launchconfig">
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_PATHS">
<listEntry value="/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/AllTests.java"/>
</listAttribute>
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_TYPES">
<listEntry value="1"/>
</listAttribute>
<stringAttribute key="org.eclipse.jdt.junit.CONTAINER" value=""/>
<booleanAttribute key="org.eclipse.jdt.junit.KEEPRUNNING_ATTR" value="false"/>
<stringAttribute key="org.eclipse.jdt.junit.TESTNAME" value=""/>
<stringAttribute key="org.eclipse.jdt.junit.TEST_KIND" value="org.eclipse.jdt.junit.loader.junit4"/>
<stringAttribute key="org.eclipse.jdt.launching.CLASSPATH_PROVIDER" value="org.eclipse.m2e.launchconfig.classpathProvider"/>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="org.eclipse.jetty.websocket.server.AllTests"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="websocket-server"/>
<stringAttribute key="org.eclipse.jdt.launching.SOURCE_PATH_PROVIDER" value="org.eclipse.m2e.launchconfig.sourcepathProvider"/>
</launchConfiguration>

View File

@ -0,0 +1,13 @@
package org.eclipse.jetty.websocket.server;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses(
{ org.eclipse.jetty.websocket.server.ab.AllTests.class, DeflateExtensionTest.class, FragmentExtensionTest.class, IdentityExtensionTest.class, LoadTest.class,
WebSocketInvalidVersionTest.class, WebSocketLoadRFC6455Test.class, WebSocketOverSSLTest.class, WebSocketServletRFCTest.class })
public class AllTests
{
/* let junit do the rest */
}

View File

@ -0,0 +1,142 @@
package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class LoadTest
{
@SuppressWarnings("serial")
public static class LoadServlet extends WebSocketServlet
{
@Override
public void registerWebSockets(WebSocketServerFactory factory)
{
factory.register(LoadSocket.class);
}
}
@WebSocket
public static class LoadSocket
{
private WebSocketConnection conn;
public static AtomicLong count = new AtomicLong(0);
@OnWebSocketConnect
public void onConnect(WebSocketConnection conn)
{
this.conn = conn;
}
@OnWebSocketMessage
public void onWebSocketText(String message)
{
try
{
conn.write("LOAD_TEXT",new FutureCallback<String>(),message);
long iter = count.incrementAndGet();
if ((iter % 100) == 0)
{
LOG.info("Echo'd back {} msgs",iter);
}
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
}
}
/**
* Thread to just send a mess of text messages.
*/
public static class TextGen implements Runnable
{
private final BlockheadClient client;
private final int iterations;
public TextGen(BlockheadClient client, int iterations)
{
this.client = client;
this.iterations = iterations;
}
@Override
public void run()
{
try
{
for (int i = 0; i < iterations; i++)
{
client.write(WebSocketFrame.text("msg-" + i));
if ((i % 100) == 0)
{
LOG.info("Client Wrote {} msgs",i);
}
}
LOG.info("Wrote {} msgs",iterations);
}
catch (IOException e)
{
LOG.warn(e);
}
}
}
private static final Logger LOG = Log.getLogger(LoadTest.class);
private static SimpleServletServer server;
@BeforeClass
public static void startServer() throws Exception
{
server = new SimpleServletServer(new LoadServlet());
server.start();
}
@AfterClass
public static void stopServer()
{
server.stop();
}
@Test
public void testManyMessages() throws Exception
{
ExecutorService threadPool = Executors.newCachedThreadPool();
BlockheadClient client = new BlockheadClient(server.getServerUri());
try
{
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
int iterations = 2000;
LoadSocket.count.set(0);
threadPool.execute(new TextGen(client,iterations));
client.readFrames(iterations,TimeUnit.SECONDS,10);
}
finally
{
client.close(StatusCode.NORMAL,"All Done");
}
}
}

View File

@ -0,0 +1,12 @@
package org.eclipse.jetty.websocket.server.ab;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses(
{ TestABCase5.class, TestABCase7_9.class })
public class AllTests
{
/* let junit do the rest */
}

View File

@ -17,7 +17,6 @@ package org.eclipse.jetty.websocket.server.ab;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
@ -25,11 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.Generator;
@ -37,8 +32,6 @@ import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.ByteBufferAssert;
import org.eclipse.jetty.websocket.server.SimpleServletServer;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.eclipse.jetty.websocket.server.WebSocketServlet;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
import org.junit.AfterClass;
@ -49,44 +42,6 @@ import org.junit.Test;
public class TestABCase5
{
@SuppressWarnings("serial")
public static class RFCServlet extends WebSocketServlet
{
@Override
public void registerWebSockets(WebSocketServerFactory factory)
{
factory.register(RFCSocket.class);
}
}
public static class RFCSocket extends WebSocketAdapter
{
private static Logger LOG = Log.getLogger(RFCSocket.class);
@Override
public void onWebSocketText(String message)
{
LOG.debug("onWebSocketText({})",message);
// Test the RFC 6455 close code 1011 that should close
// trigger a WebSocket server terminated close.
if (message.equals("CRASH"))
{
System.out.printf("Got OnTextMessage");
throw new RuntimeException("Something bad happened");
}
// echo the message back.
try
{
getConnection().write("ECHO_FROM_WEBSOCKET",new FutureCallback<String>(),message);
}
catch (IOException e)
{
e.printStackTrace(System.err);
}
}
}
private static final byte FIN = (byte)0x80;
private static final byte NOFIN = 0x00;

View File

@ -36,6 +36,7 @@ import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.HttpsURLConnection;
@ -65,6 +66,10 @@ import org.junit.Assert;
* <p>
* This client is <u>NOT</u> intended to be performant or follow the websocket spec religiously. In fact, being able to deviate from the websocket spec at will
* is desired for this client to operate properly for the unit testing within this module.
* <p>
* The BlockheadClient should never validate frames or bytes being sent for validity, against any sort of spec, or even sanity. It should, however be honest
* with regards to basic IO behavior, a write should work as expected, a read should work as expected, but <u>what</u> byte it sends or reads is not within its
* scope.
*/
public class BlockheadClient implements IncomingFrames
{
@ -88,6 +93,7 @@ public class BlockheadClient implements IncomingFrames
private byte[] clientmask = new byte[]
{ (byte)0xFF, (byte)0xFF, (byte)0xFF, (byte)0xFF };
private int timeout = 1000;
private AtomicInteger parseCount;
public BlockheadClient(URI destWebsocketURI) throws URISyntaxException
{
@ -105,6 +111,7 @@ public class BlockheadClient implements IncomingFrames
generator = new UnitGenerator();
parser = new Parser(policy);
parser.setIncomingFramesHandler(this);
parseCount = new AtomicInteger(0);
incomingFrameQueue = new LinkedBlockingDeque<>();
}
@ -210,6 +217,11 @@ public class BlockheadClient implements IncomingFrames
public void incoming(WebSocketFrame frame)
{
LOG.debug("incoming({})",frame);
int count = parseCount.incrementAndGet();
if ((count % 10) == 0)
{
LOG.info("Client parsed {} frames",count);
}
WebSocketFrame copy = new WebSocketFrame(frame); // make a copy
if (!incomingFrameQueue.offerLast(copy))
{
@ -274,6 +286,7 @@ public class BlockheadClient implements IncomingFrames
int len = 0;
while (incomingFrameQueue.size() < (startCount + expectedCount))
{
BufferUtil.clearToFill(buf);
len = read(buf);
if (len > 0)
{
@ -291,7 +304,8 @@ public class BlockheadClient implements IncomingFrames
}
if (!debug && (System.currentTimeMillis() > expireOn))
{
throw new TimeoutException("Timeout reading all of the desired frames");
throw new TimeoutException(String.format("Timeout reading all [%d] expected frames. (managed to read [%d] frames)",expectedCount,
incomingFrameQueue.size()));
}
}
}

View File

@ -1,6 +1,6 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.io.LEVEL=INFO
org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
org.eclipse.jetty.websocket.LEVEL=DEBUG
org.eclipse.jetty.websocket.generator.LEVEL=DEBUG
org.eclipse.jetty.websocket.server.blockhead.LEVEL=DEBUG
org.eclipse.jetty.io.LEVEL=WARN
# org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
org.eclipse.jetty.websocket.LEVEL=INFO
# org.eclipse.jetty.websocket.generator.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.blockhead.LEVEL=DEBUG

View File

@ -340,7 +340,6 @@
</plugin>
</plugins>
</reporting>
<!--
<repositories>
<repository>
<snapshots>
@ -351,7 +350,6 @@
<url>http://oss.sonatype.org/content/groups/jetty</url>
</repository>
</repositories>
-->
<modules>
<module>jetty-util</module>
<module>jetty-jmx</module>
@ -483,7 +481,7 @@
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<version>1.6.1</version>
<version>1.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>