diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index b1d1ddc34a5..168ab29ca92 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -7,31 +7,31 @@ public abstract class AbstractEndPoint implements EndPoint private final long _created=System.currentTimeMillis(); private final InetSocketAddress _local; private final InetSocketAddress _remote; - private volatile int _maxIdleTime; + private volatile long _maxIdleTime; private volatile long _idleTimestamp=System.currentTimeMillis(); - + protected AbstractEndPoint(InetSocketAddress local,InetSocketAddress remote) { _local=local; _remote=remote; } - + @Override public long getCreatedTimeStamp() { return _created; } - + @Override - public int getMaxIdleTime() + public long getMaxIdleTime() { return _maxIdleTime; } @Override - public void setMaxIdleTime(int timeMs) + public void setMaxIdleTime(long timeMs) { _maxIdleTime=timeMs; } @@ -61,12 +61,7 @@ public abstract class AbstractEndPoint implements EndPoint { _idleTimestamp=System.currentTimeMillis(); } - - /* ------------------------------------------------------------ */ - public void onClose() - { - } - + /* ------------------------------------------------------------ */ @Override public String toString() @@ -79,5 +74,5 @@ public abstract class AbstractEndPoint implements EndPoint isOpen(), isOutputShutdown()); } - + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java index 67dd836f2d1..c8d79edc42b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java @@ -21,7 +21,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn private AsyncConnection _connection; private final TimerTask _checkTimeout=new TimeoutTask(this); - + private final ReadInterest _readInterest = new ReadInterest() { @Override @@ -30,18 +30,18 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn if (_closed) throw new ClosedChannelException(); return _in==null || BufferUtil.hasContent(_in); - } + } }; - + private final WriteFlusher _writeFlusher = new WriteFlusher(this) { @Override protected void onIncompleteFlushed() - { + { // Don't need to do anything here as takeOutput does the signalling. } }; - + public AsyncByteArrayEndPoint(Timer timer) { super(); @@ -85,7 +85,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn super.setOutput(out); _writeFlusher.completeWrite(); } - + @Override public void reset() { @@ -117,7 +117,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn { _connection=connection; } - + public void checkReadWriteTimeout(long now) { synchronized (this) @@ -136,7 +136,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn if (isOutputShutdown()) close(); notIdle(); - + TimeoutException timeout = new TimeoutException("idle "+idleForMs+"ms"); _readInterest.failed(timeout); _writeFlusher.failed(timeout); @@ -146,22 +146,26 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn } } + @Override + public void onOpen() + { + } + @Override public void onClose() { _checkTimeout.cancel(); - super.onClose(); } private static class TimeoutTask extends TimerTask { final WeakReference _endp; - + TimeoutTask(AsyncByteArrayEndPoint endp) { _endp=new WeakReference(endp); } - + @Override public void run() { @@ -172,5 +176,5 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn endp.checkReadWriteTimeout(System.currentTimeMillis()); } }; - + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java index 9c213a80051..06c0c1b9466 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java @@ -17,7 +17,7 @@ import org.eclipse.jetty.util.FutureCallback; * some inefficiencies. *

* This class will frequently be used in conjunction with some of the utility - * implementations of {@link Callback}, such as {@link FutureCallback} and + * implementations of {@link Callback}, such as {@link FutureCallback} and * {@link ExecutorCallback}. Examples are: *

Blocking Read

* A FutureCallback can be used to block until an endpoint is ready to be filled @@ -40,21 +40,21 @@ import org.eclipse.jetty.util.FutureCallback; * } * public void onFailed(String context,Throwable cause) {...} * }); - * The executor callback can also be customized to not dispatch in some circumstances when + * The executor callback can also be customized to not dispatch in some circumstances when * it knows it can use the callback thread and does not need to dispatch. - * + * *

Blocking Write

- * The write contract is that the callback complete is not called until all data has been + * The write contract is that the callback complete is not called until all data has been * written or there is a failure. For blocking this looks like: - * + * *
  * FutureCallback future = new FutureCallback<>();
  * endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
  * String context = future.get(); // This blocks
  * 
- * + * *

Dispatched Write

- * Note also that multiple buffers may be passed in write so that gather writes + * Note also that multiple buffers may be passed in write so that gather writes * can be done: *
  * endpoint.write("ContextObj",new ExecutorCallback(executor)
@@ -66,7 +66,7 @@ import org.eclipse.jetty.util.FutureCallback;
  *   }
  *   public void onFailed(String context,Throwable cause) {...}
  * },headerBuffer,contentBuffer);
- * + * */ public interface AsyncEndPoint extends EndPoint { @@ -83,7 +83,7 @@ public interface AsyncEndPoint extends EndPoint /* ------------------------------------------------------------ */ /** Asynchronous write operation. *

- * This method performs {@link #flush(ByteBuffer...)} operation(s) and do a callback when all the data + * This method performs {@link #flush(ByteBuffer...)} operation(s) and do a callback when all the data * has been flushed or an error occurs. * @param context Context to return via the callback * @param callback The callback to call when an error occurs or we are readable. @@ -102,7 +102,8 @@ public interface AsyncEndPoint extends EndPoint AsyncConnection getAsyncConnection(); void setAsyncConnection(AsyncConnection connection); - + + void onOpen(); + void onClose(); - } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index ef0a72a4a9b..0efdb350ef9 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -116,13 +116,13 @@ public interface EndPoint * extraordinary handling takes place. * @return the max idle time in ms or if ms <= 0 implies an infinite timeout */ - int getMaxIdleTime(); + long getMaxIdleTime(); /* ------------------------------------------------------------ */ /** Set the max idle time. * @param timeMs the max idle time in MS. Timeout <= 0 implies an infinite timeout */ - void setMaxIdleTime(int timeMs); + void setMaxIdleTime(long timeMs); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index f4ac5a7afc4..e178458c660 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -60,7 +60,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, return false; } }; - + /* ------------------------------------------------------------ */ private final WriteFlusher _writeFlusher = new WriteFlusher(this) { @@ -73,7 +73,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, }; /* ------------------------------------------------------------ */ - public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key, int maxIdleTime) throws IOException + public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key, long maxIdleTime) throws IOException { super(channel); _manager = selectSet.getManager(); @@ -97,7 +97,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, { _writeFlusher.write(context,callback,buffers); } - + /* ------------------------------------------------------------ */ @Override public AsyncConnection getAsyncConnection() @@ -171,7 +171,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, public void checkReadWriteTimeout(long now) { synchronized (this) - { + { if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting()) { long idleTimestamp = getIdleTimestamp(); @@ -196,7 +196,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, } } - + /* ------------------------------------------------------------ */ @Override protected void shutdownInput() @@ -204,7 +204,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, super.shutdownInput(); updateKey(); } - + /* ------------------------------------------------------------ */ /** * Updates selection key. This method schedules a call to doUpdateKey to do the keyChange @@ -327,6 +327,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, updateKey(); } + @Override + public void onOpen() + { + } + /* ------------------------------------------------------------ */ @Override public void onClose() diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index addc1ba4b0c..fcd5c3da826 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -51,6 +51,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private final ManagedSelector[] _selectSets; private long _selectSetIndex; + private volatile long _maxIdleTime; protected SelectorManager() { @@ -62,12 +63,19 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa this._selectSets = new ManagedSelector[selectors]; } - /** * @return the max idle time */ - protected abstract int getMaxIdleTime(); - + protected long getMaxIdleTime() + { + return _maxIdleTime; + } + + public void setMaxIdleTime(long maxIdleTime) + { + _maxIdleTime = maxIdleTime; + } + protected abstract void execute(Runnable task); /** @@ -139,18 +147,27 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa /** * @param endpoint the endPoint being opened */ - protected abstract void endPointOpened(AsyncEndPoint endpoint); + protected void endPointOpened(AsyncEndPoint endpoint) + { + endpoint.getAsyncConnection().onOpen(); + } /** * @param endpoint the endPoint being closed */ - protected abstract void endPointClosed(AsyncEndPoint endpoint); + protected void endPointClosed(AsyncEndPoint endpoint) + { + endpoint.getAsyncConnection().onClose(); + endpoint.onClose(); + } /** * @param endpoint the endPoint being upgraded * @param oldConnection the previous connection */ - protected abstract void endPointUpgraded(AsyncEndPoint endpoint,AsyncConnection oldConnection); + protected void endPointUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection) + { + } /** * @param channel the socket channel diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java deleted file mode 100644 index d640f53bd77..00000000000 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/BufferUtilTest.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.eclipse.jetty.io; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.nio.ByteBuffer; - -import org.eclipse.jetty.util.BufferUtil; -import org.junit.Test; - -public class BufferUtilTest -{ - @Test - public void testPut() throws Exception - { - ByteBuffer to = BufferUtil.allocate(10); - ByteBuffer from=BufferUtil.toBuffer("12345"); - - BufferUtil.clear(to); - assertEquals(5,BufferUtil.append(from,to)); - assertTrue(BufferUtil.isEmpty(from)); - assertEquals("12345",BufferUtil.toString(to)); - - from=BufferUtil.toBuffer("XX67890ZZ"); - from.position(2); - - assertEquals(5,BufferUtil.append(from,to)); - assertEquals(2,from.remaining()); - assertEquals("1234567890",BufferUtil.toString(to)); - } - - - @Test - public void testPutDirect() throws Exception - { - ByteBuffer to = BufferUtil.allocateDirect(10); - ByteBuffer from=BufferUtil.toBuffer("12345"); - - BufferUtil.clear(to); - assertEquals(5,BufferUtil.append(from,to)); - assertTrue(BufferUtil.isEmpty(from)); - assertEquals("12345",BufferUtil.toString(to)); - - from=BufferUtil.toBuffer("XX67890ZZ"); - from.position(2); - - assertEquals(5,BufferUtil.append(from,to)); - assertEquals(2,from.remaining()); - assertEquals("1234567890",BufferUtil.toString(to)); - } - -} diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 1e6586962a4..a8a6967c865 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -1,11 +1,5 @@ package org.eclipse.jetty.io; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; @@ -30,42 +24,25 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class SelectChannelEndPointTest { protected volatile AsyncEndPoint _lastEndp; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); - private int maxIdleTimeout = 600000; // TODO: use smaller value protected SelectorManager _manager = new SelectorManager() { - @Override - protected int getMaxIdleTime() - { - return maxIdleTimeout; - } - @Override protected void execute(Runnable task) { _threadPool.execute(task); } - @Override - protected void endPointClosed(AsyncEndPoint endpoint) - { - } - - @Override - protected void endPointOpened(AsyncEndPoint endpoint) - { - endpoint.getAsyncConnection().onOpen(); - } - - @Override - protected void endPointUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection) - { - } - @Override public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) { @@ -81,6 +58,9 @@ public class SelectChannelEndPointTest return endp; } }; + { + _manager.setMaxIdleTime(600000); // TODO: use smaller value + } // Must be volatile or the test may fail spuriously protected volatile int _blockAt=0; @@ -350,12 +330,12 @@ public class SelectChannelEndPointTest _blockAt=10; clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.flush(); - + while(_lastEndp==null); _lastEndp.setMaxIdleTime(10*specifiedTimeout); Thread.sleep((11*specifiedTimeout)/10); - + long start=System.currentTimeMillis(); try { @@ -367,7 +347,7 @@ public class SelectChannelEndPointTest int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue(); Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3*specifiedTimeout/4)); } - + // write remaining characters clientOutputStream.write("90ABCDEF".getBytes("UTF-8")); clientOutputStream.flush(); @@ -538,13 +518,13 @@ public class SelectChannelEndPointTest //if (latch.getCount()%1000==0) // System.out.println(writes-latch.getCount()); - + latch.countDown(); } } catch(Throwable e) { - + long now = System.currentTimeMillis(); System.err.println("count="+count); System.err.println("latch="+latch.getCount()); @@ -552,7 +532,7 @@ public class SelectChannelEndPointTest System.err.println("last="+(now-last)); System.err.println("endp="+_lastEndp); System.err.println("conn="+_lastEndp.getAsyncConnection()); - + e.printStackTrace(); } } @@ -581,7 +561,7 @@ public class SelectChannelEndPointTest Assert.fail(); last=latch.getCount(); } - + assertEquals(0,latch.getCount()); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java index d8883647ff2..24798feb5cc 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java @@ -7,7 +7,6 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; - import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLSocket; @@ -32,38 +31,14 @@ public class SslConnectionTest protected volatile AsyncEndPoint _lastEndp; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); - private int maxIdleTimeout = 600000; // TODO: use smaller value protected SelectorManager _manager = new SelectorManager() { - @Override - protected int getMaxIdleTime() - { - return maxIdleTimeout; - } - @Override protected void execute(Runnable task) { _threadPool.execute(task); } - @Override - protected void endPointClosed(AsyncEndPoint endpoint) - { - } - - @Override - protected void endPointOpened(AsyncEndPoint endpoint) - { - // System.err.println("endPointOpened"); - endpoint.getAsyncConnection().onOpen(); - } - - @Override - protected void endPointUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection) - { - } - @Override public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) { @@ -89,12 +64,15 @@ public class SslConnectionTest return endp; } }; + { + _manager.setMaxIdleTime(600000); // TODO: use smaller value + } // Must be volatile or the test may fail spuriously protected volatile int _blockAt=0; private volatile int _writeCount=1; - + @BeforeClass public static void initSslEngine() throws Exception { @@ -147,7 +125,7 @@ public class SslConnectionTest { // System.err.println("onClose"); } - + @Override public synchronized void onFillable() { @@ -171,7 +149,7 @@ public class SslConnectionTest } // System.err.println(BufferUtil.toDetailString(_in)); - + // Write everything int l=_in.remaining(); if (l>0) @@ -181,7 +159,7 @@ public class SslConnectionTest blockingWrite.get(); // System.err.println("wrote "+l); } - + // are we done? if (endp.isInputShutdown()) { @@ -216,7 +194,7 @@ public class SslConnectionTest public void testHelloWorld() throws Exception { //Log.getRootLogger().setDebugEnabled(true); - + // Log.getRootLogger().setDebugEnabled(true); Socket client = newClient(); // System.err.println("client="+client); @@ -225,24 +203,24 @@ public class SslConnectionTest SocketChannel server = _connector.accept(); server.configureBlocking(false); _manager.accept(server); - + client.getOutputStream().write("HelloWorld".getBytes("UTF-8")); // System.err.println("wrote"); byte[] buffer = new byte[1024]; int len = client.getInputStream().read(buffer); // System.err.println(new String(buffer,0,len,"UTF-8")); - + client.close(); - + } - - + + @Test @Ignore public void testNasty() throws Exception { //Log.getRootLogger().setDebugEnabled(true); - + // Log.getRootLogger().setDebugEnabled(true); final Socket client = newClient(); // System.err.println("client="+client); @@ -251,7 +229,7 @@ public class SslConnectionTest SocketChannel server = _connector.accept(); server.configureBlocking(false); _manager.accept(server); - + new Thread() { public void run() @@ -277,7 +255,7 @@ public class SslConnectionTest } } }.start(); - + for (int i=0;i<100000;i++) { client.getOutputStream().write(("HelloWorld "+i+"\n").getBytes("UTF-8")); @@ -285,10 +263,10 @@ public class SslConnectionTest if (i%1000==0) Thread.sleep(10); } - + Thread.sleep(20000); client.close(); - + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 224108809f6..86ddfdcd3f3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -131,7 +131,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co { return _byteBufferPool; } - + /* ------------------------------------------------------------ */ public void setByteBufferPool(ByteBufferPool byteBufferPool) { @@ -174,7 +174,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co public void open() throws IOException { } - + /* ------------------------------------------------------------ */ public void close() throws IOException { @@ -185,13 +185,13 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co { return -1; } - + /* ------------------------------------------------------------ */ /** * @return Returns the maxIdleTime. */ @Override - public int getMaxIdleTime() + public long getMaxIdleTime() { return _maxIdleTime; } @@ -278,10 +278,10 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co if (_name==null) _name = (getHost() == null?"0.0.0.0":getHost()) + ":" + getPort(); - + // open listener port open(); - + _name=_name+"/"+getLocalPort(); super.doStart(); @@ -316,7 +316,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co if (thread != null) thread.interrupt(); } - + int i=_name.lastIndexOf("/"); if (i>0) _name=_name.substring(0,i); @@ -437,8 +437,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co /* ------------------------------------------------------------ */ protected void connectionOpened(AsyncConnection connection) { - // TODO: should we dispatch the call to onOpen() to another thread ? - connection.onOpen(); _stats.connectionOpened(); } @@ -453,9 +451,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co /* ------------------------------------------------------------ */ protected void connectionClosed(AsyncConnection connection) { - // TODO: should we dispatch the call to onClose() to another thread ? - connection.onClose(); - long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp(); // TODO: remove casts to HttpConnection int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java index 6c42ac4fefe..e24f6c4d955 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java @@ -4,11 +4,11 @@ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. -// The Eclipse Public License is available at +// The Eclipse Public License is available at // http://www.eclipse.org/legal/epl-v10.html // The Apache License v2.0 is available at // http://www.opensource.org/licenses/apache2.0.php -// You may elect to redistribute this code under either of these licenses. +// You may elect to redistribute this code under either of these licenses. // ======================================================================== package org.eclipse.jetty.server; @@ -21,47 +21,47 @@ import org.eclipse.jetty.util.component.LifeCycle; /** HTTP Connector. * Implementations of this interface provide connectors for the HTTP protocol. - * A connector receives requests (normally from a socket) and calls the + * A connector receives requests (normally from a socket) and calls the * handle method of the Handler object. These operations are performed using * threads from the ThreadPool set on the connector. - * + * * When a connector is registered with an instance of Server, then the server * will set itself as both the ThreadPool and the Handler. Note that a connector * can be used without a Server if a thread pool and handler are directly provided. - * + * */ public interface Connector extends LifeCycle -{ +{ /* ------------------------------------------------------------ */ /** * @return the name of the connector. Defaults to the HostName:port */ String getName(); - + /* ------------------------------------------------------------ */ Server getServer(); /* ------------------------------------------------------------ */ - Executor findExecutor(); - + Executor findExecutor(); + /* ------------------------------------------------------------ */ - Executor getExecutor(); - + Executor getExecutor(); + /* ------------------------------------------------------------ */ - ByteBufferPool getByteBufferPool(); - + ByteBufferPool getByteBufferPool(); + /* ------------------------------------------------------------ */ /** * @return Max Idle time for connections in milliseconds */ - int getMaxIdleTime(); - + long getMaxIdleTime(); + /* ------------------------------------------------------------ */ /** * @return the underlying socket, channel, buffer etc. for the connector. */ Object getTransport(); - + /* ------------------------------------------------------------ */ Statistics getStatistics(); @@ -69,28 +69,28 @@ public interface Connector extends LifeCycle { /* ------------------------------------------------------------ */ /** - * Opens the connector + * Opens the connector * @throws IOException */ void open() throws IOException; /* ------------------------------------------------------------ */ void close(); - + /* ------------------------------------------------------------ */ /** - * @return The hostname representing the interface to which + * @return The hostname representing the interface to which * this connector will bind, or null for all interfaces. */ String getHost(); - + /* ------------------------------------------------------------ */ /** * @return The configured port for the connector or 0 if any available * port may be used. */ int getPort(); - + /* ------------------------------------------------------------ */ /** * @return The actual port the connector is listening on or @@ -98,20 +98,20 @@ public interface Connector extends LifeCycle */ int getLocalPort(); } - + interface Statistics extends LifeCycle { /* ------------------------------------------------------------ */ - /** + /** * @return True if statistics collection is turned on. */ boolean getStatsOn(); - + /* ------------------------------------------------------------ */ /** Reset statistics. */ void statsReset(); - + /* ------------------------------------------------------------ */ /** * @return Get the number of messages received by this connector @@ -119,7 +119,7 @@ public interface Connector extends LifeCycle * is undefined. */ public int getMessagesIn(); - + /* ------------------------------------------------------------ */ /** * @return Get the number of messages sent by this connector @@ -127,7 +127,7 @@ public interface Connector extends LifeCycle * is undefined. */ public int getMessagesOut(); - + /* ------------------------------------------------------------ */ /** * @return Get the number of bytes received by this connector @@ -135,7 +135,7 @@ public interface Connector extends LifeCycle * is undefined. */ public int getBytesIn(); - + /* ------------------------------------------------------------ */ /** * @return Get the number of bytes sent by this connector @@ -151,42 +151,42 @@ public interface Connector extends LifeCycle public long getConnectionsDurationTotal(); /* ------------------------------------------------------------ */ - /** + /** * @return Number of connections accepted by the server since * statsReset() called. Undefined if setStatsOn(false). */ public int getConnections() ; /* ------------------------------------------------------------ */ - /** + /** * @return Number of connections currently open that were opened * since statsReset() called. Undefined if setStatsOn(false). */ public int getConnectionsOpen() ; /* ------------------------------------------------------------ */ - /** + /** * @return Maximum number of connections opened simultaneously * since statsReset() called. Undefined if setStatsOn(false). */ public int getConnectionsOpenMax() ; /* ------------------------------------------------------------ */ - /** + /** * @return Maximum duration in milliseconds of an open connection * since statsReset() called. Undefined if setStatsOn(false). */ public long getConnectionsDurationMax(); /* ------------------------------------------------------------ */ - /** + /** * @return Mean duration in milliseconds of open connections * since statsReset() called. Undefined if setStatsOn(false). */ public double getConnectionsDurationMean() ; /* ------------------------------------------------------------ */ - /** + /** * @return Standard deviation of duration in milliseconds of * open connections since statsReset() called. Undefined if * setStatsOn(false). @@ -194,28 +194,28 @@ public interface Connector extends LifeCycle public double getConnectionsDurationStdDev() ; /* ------------------------------------------------------------ */ - /** + /** * @return Mean number of messages received per connection * since statsReset() called. Undefined if setStatsOn(false). */ public double getConnectionsMessagesInMean() ; /* ------------------------------------------------------------ */ - /** + /** * @return Standard Deviation of number of messages received per connection * since statsReset() called. Undefined if setStatsOn(false). */ public double getConnectionsMessagesInStdDev() ; /* ------------------------------------------------------------ */ - /** + /** * @return Maximum number of messages received per connection * since statsReset() called. Undefined if setStatsOn(false). */ public int getConnectionsMessagesInMax(); - + /* ------------------------------------------------------------ */ - /** + /** * @return Timestamp stats were started at. */ public long getStatsOnMs(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java index 7f522303884..a9a107ce39b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java @@ -39,7 +39,7 @@ public class LocalHttpConnector extends HttpConnector { setMaxIdleTime(30000); } - + /* ------------------------------------------------------------ */ @Override public Object getTransport() @@ -70,7 +70,7 @@ public class LocalHttpConnector extends HttpConnector { LOG.debug("getResponses"); Phaser phaser=_executor._phaser; - int phase = phaser.register(); // the corresponding arrival will be done by the acceptor thread when it takes + int phase = phaser.register(); // the corresponding arrival will be done by the acceptor thread when it takes LocalEndPoint request = new LocalEndPoint(); request.setInput(requestsBuffer); _connects.add(request); @@ -80,7 +80,7 @@ public class LocalHttpConnector extends HttpConnector /* ------------------------------------------------------------ */ /** - * Execute a request and return the EndPoint through which + * Execute a request and return the EndPoint through which * responses can be received. * @param rawRequest * @return @@ -88,7 +88,7 @@ public class LocalHttpConnector extends HttpConnector public LocalEndPoint executeRequest(String rawRequest) { Phaser phaser=_executor._phaser; - int phase = phaser.register(); // the corresponding arrival will be done by the acceptor thread when it takes + int phase = phaser.register(); // the corresponding arrival will be done by the acceptor thread when it takes LocalEndPoint endp = new LocalEndPoint(); endp.setInput(BufferUtil.toBuffer(rawRequest,StringUtil.__UTF8_CHARSET)); _connects.add(endp); @@ -106,7 +106,7 @@ public class LocalHttpConnector extends HttpConnector connectionOpened(connection); _executor._phaser.arriveAndDeregister(); // arrive for the register done in getResponses } - + /* ------------------------------------------------------------ */ @Override protected void doStart() throws Exception @@ -141,14 +141,14 @@ public class LocalHttpConnector extends HttpConnector { return false; } - + }; final Executor _executor; LocalExecutor(Executor e) { _executor=e; } - + @Override public void execute(final Runnable task) { @@ -167,7 +167,7 @@ public class LocalHttpConnector extends HttpConnector { _phaser.arriveAndDeregister(); } - } + } }); } } @@ -176,7 +176,7 @@ public class LocalHttpConnector extends HttpConnector public class LocalEndPoint extends AsyncByteArrayEndPoint { private CountDownLatch _closed = new CountDownLatch(1); - + LocalEndPoint() { super(getTimer()); @@ -204,7 +204,6 @@ public class LocalHttpConnector extends HttpConnector public void onClose() { super.onClose(); - connectionClosed(getAsyncConnection()); _closed.countDown(); } @@ -237,5 +236,5 @@ public class LocalHttpConnector extends HttpConnector } } } - } + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java index 52d545479ed..1dc74c80365 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java @@ -70,7 +70,7 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto this(Math.max(1,(Runtime.getRuntime().availableProcessors())/4), Math.max(1,(Runtime.getRuntime().availableProcessors())/4)); } - + /* ------------------------------------------------------------ */ public SelectChannelConnector(int acceptors, int selectors) { @@ -78,7 +78,7 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto _manager=new ConnectorSelectorManager(selectors); addBean(_manager,true); } - + /* ------------------------------------------------------------ */ @Override @@ -210,7 +210,6 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto /* ------------------------------------------------------------------------------- */ protected void endPointClosed(AsyncEndPoint endpoint) { - endpoint.onClose(); connectionClosed(endpoint.getAsyncConnection()); } @@ -237,15 +236,16 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto findExecutor().execute(task); } - @Override - protected int getMaxIdleTime() + @Override + protected long getMaxIdleTime() { return SelectChannelConnector.this.getMaxIdleTime(); } - + @Override protected void endPointClosed(AsyncEndPoint endpoint) { + super.endPointClosed(endpoint); SelectChannelConnector.this.endPointClosed(endpoint); } @@ -253,6 +253,7 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto protected void endPointOpened(AsyncEndPoint endpoint) { // TODO handle max connections and low resources + super.endPointOpened(endpoint); connectionOpened(endpoint.getAsyncConnection()); } diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java index d0fecdb78ca..4a2c5daba48 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java @@ -287,22 +287,6 @@ public class SPDYClient return result; } - @Override - protected void endPointOpened(AsyncEndPoint endpoint) - { - } - - @Override - protected void endPointUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection) - { - } - - @Override - protected void endPointClosed(AsyncEndPoint endpoint) - { - endpoint.getAsyncConnection().onClose(); - } - @Override public AsyncConnection newConnection(final SocketChannel channel, AsyncEndPoint endPoint, Object attachment) { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java index d319b2bc04f..96627ed2259 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BufferUtil.java @@ -649,6 +649,34 @@ public class BufferUtil { return ByteBuffer.wrap(s.getBytes(charset)); } + + /** + * Create a new ByteBuffer using provided byte array. + * + * @param array + * the byte array to back buffer with. + * @return ByteBuffer with provided byte array, in flush mode + */ + public static ByteBuffer toBuffer(byte array[]) + { + return ByteBuffer.wrap(array); + } + + /** + * Create a new ByteBuffer using the provided byte array. + * + * @param array + * the byte array to use. + * @param offset + * the offset within the byte array to use from + * @param length + * the length in bytes of the array to use + * @return ByteBuffer with provided byte array, in flush mode + */ + public static ByteBuffer toBuffer(byte array[], int offset, int length) + { + return ByteBuffer.wrap(array,offset,length); + } public static ByteBuffer toBuffer(File file) throws IOException { diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/BufferUtilTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/BufferUtilTest.java index 4ebd302668c..2d70a698d06 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/BufferUtilTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/BufferUtilTest.java @@ -14,15 +14,14 @@ package org.eclipse.jetty.util; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.nio.ByteBuffer; +import java.util.Arrays; +import org.junit.Assert; import org.junit.Test; -/** - * - */ public class BufferUtilTest { @Test @@ -119,4 +118,81 @@ public class BufferUtilTest assertEquals("t"+i,str[i],BufferUtil.toString(buffer)); } } + + @Test + public void testPut() throws Exception + { + ByteBuffer to = BufferUtil.allocate(10); + ByteBuffer from=BufferUtil.toBuffer("12345"); + + BufferUtil.clear(to); + assertEquals(5,BufferUtil.append(from,to)); + assertTrue(BufferUtil.isEmpty(from)); + assertEquals("12345",BufferUtil.toString(to)); + + from=BufferUtil.toBuffer("XX67890ZZ"); + from.position(2); + + assertEquals(5,BufferUtil.append(from,to)); + assertEquals(2,from.remaining()); + assertEquals("1234567890",BufferUtil.toString(to)); + } + + @Test + public void testPutDirect() throws Exception + { + ByteBuffer to = BufferUtil.allocateDirect(10); + ByteBuffer from=BufferUtil.toBuffer("12345"); + + BufferUtil.clear(to); + assertEquals(5,BufferUtil.append(from,to)); + assertTrue(BufferUtil.isEmpty(from)); + assertEquals("12345",BufferUtil.toString(to)); + + from=BufferUtil.toBuffer("XX67890ZZ"); + from.position(2); + + assertEquals(5,BufferUtil.append(from,to)); + assertEquals(2,from.remaining()); + assertEquals("1234567890",BufferUtil.toString(to)); + } + + @Test + public void testToBuffer_Array() + { + byte arr[] = new byte[128]; + Arrays.fill(arr,(byte)0x44); + ByteBuffer buf = BufferUtil.toBuffer(arr); + + int count = 0; + while (buf.remaining() > 0) + { + byte b = buf.get(); + Assert.assertEquals(b,0x44); + count++; + } + + Assert.assertEquals("Count of bytes",arr.length,count); + } + + @Test + public void testToBuffer_ArrayOffsetLength() + { + byte arr[] = new byte[128]; + Arrays.fill(arr,(byte)0xFF); // fill whole thing with FF + int offset = 10; + int length = 100; + Arrays.fill(arr,offset,offset + length,(byte)0x77); // fill partial with 0x77 + ByteBuffer buf = BufferUtil.toBuffer(arr,offset,length); + + int count = 0; + while (buf.remaining() > 0) + { + byte b = buf.get(); + Assert.assertEquals(b,0x77); + count++; + } + + Assert.assertEquals("Count of bytes",length,count); + } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java index 4cf30d360ae..9add9441c32 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java @@ -21,7 +21,6 @@ import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; - import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; @@ -51,36 +50,12 @@ public class WebSocketClientSelectorManager extends SelectorManager this.executor = executor; } - @Override - protected void endPointClosed(AsyncEndPoint endpoint) - { - endpoint.getAsyncConnection().onClose(); - } - - @Override - protected void endPointOpened(AsyncEndPoint endpoint) - { - } - - @Override - protected void endPointUpgraded(AsyncEndPoint endpoint, AsyncConnection oldConnection) - { - // TODO Investigate role of this with websocket - - } - @Override protected void execute(Runnable task) { // TODO Auto-generated method stub } - @Override - protected int getMaxIdleTime() - { - return 0; - } - public SslContextFactory getSslContextFactory() { return sslContextFactory; diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/DataFrameBytes.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/DataFrameBytes.java index 5a43f07e97a..fcf4be548c3 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/DataFrameBytes.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/DataFrameBytes.java @@ -19,10 +19,13 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.protocol.WebSocketFrame; public class DataFrameBytes extends FrameBytes { + private static final Logger LOG = Log.getLogger(DataFrameBytes.class); private int size; private ByteBuffer buffer; @@ -34,6 +37,11 @@ public class DataFrameBytes extends FrameBytes @Override public void completed(C context) { + if (LOG.isDebugEnabled()) + { + LOG.debug("completed({}) - frame.remaining() = {}",context,frame.remaining()); + } + connection.getBufferPool().release(buffer); if (frame.remaining() > 0) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java index e7d5315f55e..10acc1d4dab 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java @@ -340,14 +340,17 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements private void write(ByteBuffer buffer, WebSocketAsyncConnection webSocketAsyncConnection, FrameBytes frameBytes) { + AsyncEndPoint endpoint = getEndPoint(); + if (LOG.isDebugEnabled()) { LOG.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes); - LOG.debug("EndPoint: {}",getEndPoint()); + LOG.debug("EndPoint: {}",endpoint); } try { - getEndPoint().write(frameBytes.context,frameBytes,buffer); + endpoint.write(frameBytes.context,frameBytes,buffer); + // endpoint.flush(); } catch (Throwable t) { diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/Generator.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/Generator.java index 1a39f17253c..26191239469 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/Generator.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/Generator.java @@ -166,10 +166,13 @@ public class Generator */ public ByteBuffer generate(int bufferSize, WebSocketFrame frame) { - LOG.debug(String.format("Generate.Frame[opcode=%s,fin=%b,cont=%b,rsv1=%b,rsv2=%b,rsv3=%b,mask=%b,plength=%d]",frame.getOpCode().toString(), - frame.isFin(),frame.isContinuation(),frame.isRsv1(),frame.isRsv2(),frame.isRsv3(),frame.isMasked(),frame.getPayloadLength())); - - assertFrameValid(frame); + if (LOG.isDebugEnabled()) + { + LOG.debug(String.format( + "Generate.Frame[opcode=%s,fin=%b,cont=%b,rsv1=%b,rsv2=%b,rsv3=%b,mask=%b,plength=%d,payloadStart=%s,remaining=%d,position=%s]",frame + .getOpCode().toString(),frame.isFin(),frame.isContinuation(),frame.isRsv1(),frame.isRsv2(),frame.isRsv3(),frame.isMasked(),frame + .getPayloadLength(),frame.getPayloadStart(),frame.remaining(),frame.position())); + } /* * prepare the byte buffer to put frame into @@ -177,117 +180,138 @@ public class Generator ByteBuffer buffer = bufferPool.acquire(bufferSize,true); BufferUtil.clearToFill(buffer); - /* - * start the generation process - */ - byte b; - - // Setup fin thru opcode - b = 0x00; - if (frame.isFin()) + if (frame.remaining() == frame.getPayloadLength()) { - b |= 0x80; // 1000_0000 - } - if (frame.isRsv1()) - { - b |= 0x40; // 0100_0000 - } - if (frame.isRsv2()) - { - b |= 0x20; // 0010_0000 - } - if (frame.isRsv3()) - { - b |= 0x10; - } + // we need a framing header + assertFrameValid(frame); - byte opcode = frame.getOpCode().getCode(); + /* + * start the generation process + */ + byte b; - if (frame.isContinuation()) - { - // Continuations are not the same OPCODE - opcode = OpCode.CONTINUATION.getCode(); - } + // Setup fin thru opcode + b = 0x00; + if (frame.isFin()) + { + b |= 0x80; // 1000_0000 + } + if (frame.isRsv1()) + { + b |= 0x40; // 0100_0000 + } + if (frame.isRsv2()) + { + b |= 0x20; // 0010_0000 + } + if (frame.isRsv3()) + { + b |= 0x10; + } - b |= opcode & 0x0F; + byte opcode = frame.getOpCode().getCode(); - buffer.put(b); + if (frame.isContinuation()) + { + // Continuations are not the same OPCODE + opcode = OpCode.CONTINUATION.getCode(); + } - // is masked - b = 0x00; - b |= (frame.isMasked()?0x80:0x00); + b |= opcode & 0x0F; - // payload lengths - int payloadLength = frame.getPayloadLength(); - - /* - * if length is over 65535 then its a 7 + 64 bit length - */ - if (payloadLength > 0xFF_FF) - { - // we have a 64 bit length - b |= 0x7F; - buffer.put(b); // indicate 8 byte length - buffer.put((byte)0); // - buffer.put((byte)0); // anything over an - buffer.put((byte)0); // int is just - buffer.put((byte)0); // intsane! - buffer.put((byte)((payloadLength >> 24) & 0xFF)); - buffer.put((byte)((payloadLength >> 16) & 0xFF)); - buffer.put((byte)((payloadLength >> 8) & 0xFF)); - buffer.put((byte)(payloadLength & 0xFF)); - } - /* - * if payload is ge 126 we have a 7 + 16 bit length - */ - else if (payloadLength >= 0x7E) - { - b |= 0x7E; - buffer.put(b); // indicate 2 byte length - buffer.put((byte)(payloadLength >> 8)); - buffer.put((byte)(payloadLength & 0xFF)); - } - /* - * we have a 7 bit length - */ - else - { - b |= (payloadLength & 0x7F); buffer.put(b); - } - // masking key - if (frame.isMasked()) - { - buffer.put(frame.getMask()); - } + // is masked + b = 0x00; + b |= (frame.isMasked()?0x80:0x00); - // remember the position - int positionPrePayload = buffer.position(); + // payload lengths + int payloadLength = frame.getPayloadLength(); + + /* + * if length is over 65535 then its a 7 + 64 bit length + */ + if (payloadLength > 0xFF_FF) + { + // we have a 64 bit length + b |= 0x7F; + buffer.put(b); // indicate 8 byte length + buffer.put((byte)0); // + buffer.put((byte)0); // anything over an + buffer.put((byte)0); // int is just + buffer.put((byte)0); // intsane! + buffer.put((byte)((payloadLength >> 24) & 0xFF)); + buffer.put((byte)((payloadLength >> 16) & 0xFF)); + buffer.put((byte)((payloadLength >> 8) & 0xFF)); + buffer.put((byte)(payloadLength & 0xFF)); + } + /* + * if payload is ge 126 we have a 7 + 16 bit length + */ + else if (payloadLength >= 0x7E) + { + b |= 0x7E; + buffer.put(b); // indicate 2 byte length + buffer.put((byte)(payloadLength >> 8)); + buffer.put((byte)(payloadLength & 0xFF)); + } + /* + * we have a 7 bit length + */ + else + { + b |= (payloadLength & 0x7F); + buffer.put(b); + } + + // masking key + if (frame.isMasked()) + { + buffer.put(frame.getMask()); + } + } // copy payload if (frame.hasPayload()) { - buffer.put(frame.getPayload()); - } + // remember the position + int maskingStartPosition = buffer.position(); - int positionPostPayload = buffer.position(); + // remember the offset within the frame payload (for working with + // windowed frames that don't split on 4 byte barriers) + int payloadOffset = frame.getPayload().position(); + int payloadStart = frame.getPayloadStart(); - // mask it if needed - if (frame.isMasked()) - { - // move back to remembered position. - int size = positionPostPayload - positionPrePayload; - byte[] mask = frame.getMask(); - int pos; - for (int i = 0; i < size; i++) + // put as much as possible into the buffer + BufferUtil.put(frame.getPayload(),buffer); + + // mask it if needed + if (frame.isMasked()) { - pos = positionPrePayload + i; - // Mask each byte by its absolute position in the bytebuffer - buffer.put(pos,(byte)(buffer.get(pos) ^ mask[i % 4])); + // move back to remembered position. + int size = buffer.position() - maskingStartPosition; + byte[] mask = frame.getMask(); + byte b; + int posBuf; + int posFrame; + for (int i = 0; i < size; i++) + { + posBuf = i + maskingStartPosition; + posFrame = i + (payloadOffset - payloadStart); + + // get raw byte from buffer. + b = buffer.get(posBuf); + + // mask, using offset information from frame windowing. + b ^= mask[posFrame % 4]; + + // Mask each byte by its absolute position in the bytebuffer + buffer.put(posBuf,b); + } } } + BufferUtil.flipToFlush(buffer,0); return buffer; } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/WebSocketFrame.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/WebSocketFrame.java index 2dcd1601bb9..c56e428760d 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/WebSocketFrame.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/protocol/WebSocketFrame.java @@ -93,6 +93,10 @@ public class WebSocketFrame implements Frame * It is assumed to always be in FLUSH mode (ready to read) in this object. */ private ByteBuffer data; + private int payloadLength = 0; + /** position of start of data within a fresh payload */ + private int payloadStart = -1; + private boolean continuation = false; private int continuationIndex = 0; @@ -207,12 +211,20 @@ public class WebSocketFrame implements Frame return opcode; } + /** + * Get the payload ByteBuffer. possible null. + *

+ * + * @return A {@link ByteBuffer#slice()} of the payload buffer (to prevent modification of the buffer state). Possibly null if no payload present. + *

+ * Note: this method is exposed via the immutable {@link Frame#getPayload()} method. + */ @Override public ByteBuffer getPayload() { if (data != null) { - return data.slice(); + return data; } else { @@ -236,12 +248,21 @@ public class WebSocketFrame implements Frame { return 0; } - return data.remaining(); + return payloadLength; + } + + public int getPayloadStart() + { + if (data == null) + { + return -1; + } + return payloadStart; } public boolean hasPayload() { - return ((data != null) && (data.remaining() > 0)); + return ((data != null) && (payloadLength > 0)); } public boolean isContinuation() @@ -284,6 +305,29 @@ public class WebSocketFrame implements Frame return rsv3; } + /** + * Get the position currently within the payload data. + *

+ * Used by flow control, generator and window sizing. + * + * @return the number of bytes remaining in the payload data that has not yet been written out to Network ByteBuffers. + */ + public int position() + { + if (data == null) + { + return -1; + } + return data.position(); + } + + /** + * Get the number of bytes remaining to write out to the Network ByteBuffer. + *

+ * Used by flow control, generator and window sizing. + * + * @return the number of bytes remaining in the payload data that has not yet been written out to Network ByteBuffers. + */ public int remaining() { if (data == null) @@ -302,6 +346,7 @@ public class WebSocketFrame implements Frame opcode = null; masked = false; data = null; + payloadLength = 0; mask = null; continuationIndex = 0; continuation = false; @@ -366,11 +411,9 @@ public class WebSocketFrame implements Frame } } - int len = buf.length; - data = ByteBuffer.allocate(len); - BufferUtil.clearToFill(data); - data.put(buf,0,len); - BufferUtil.flipToFlush(data,0); + data = BufferUtil.toBuffer(buf); + payloadStart = data.position(); + payloadLength = data.limit(); return this; } @@ -396,10 +439,9 @@ public class WebSocketFrame implements Frame } } - data = ByteBuffer.allocate(len); - BufferUtil.clearToFill(data); - data.put(buf,0,len); - BufferUtil.flipToFlush(data,0); + data = BufferUtil.toBuffer(buf,offset,len); + payloadStart = data.position(); + payloadLength = data.limit(); return this; } @@ -430,6 +472,8 @@ public class WebSocketFrame implements Frame } data = buf.slice(); + payloadStart = data.position(); + payloadLength = data.limit(); return this; } @@ -470,7 +514,7 @@ public class WebSocketFrame implements Frame b.append("NO-OP"); } b.append('['); - b.append("len=").append(getPayloadLength()); + b.append("len=").append(payloadLength); b.append(",fin=").append(fin); b.append(",masked=").append(masked); b.append(",continuation=").append(continuation); diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/AllTests.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/AllTests.java index 444448b8fb2..9f4cddc52fb 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/AllTests.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/AllTests.java @@ -2,22 +2,13 @@ package org.eclipse.jetty.websocket; import org.eclipse.jetty.websocket.driver.EventMethodsCacheTest; import org.eclipse.jetty.websocket.driver.WebSocketEventDriverTest; -import org.eclipse.jetty.websocket.protocol.AcceptHashTest; -import org.eclipse.jetty.websocket.protocol.ClosePayloadParserTest; -import org.eclipse.jetty.websocket.protocol.ParserTest; -import org.eclipse.jetty.websocket.protocol.PingPayloadParserTest; -import org.eclipse.jetty.websocket.protocol.RFC6455ExamplesGeneratorTest; -import org.eclipse.jetty.websocket.protocol.RFC6455ExamplesParserTest; -import org.eclipse.jetty.websocket.protocol.TextPayloadParserTest; -import org.eclipse.jetty.websocket.protocol.WebSocketFrameTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @RunWith(Suite.class) @Suite.SuiteClasses( - { org.eclipse.jetty.websocket.ab.AllTests.class, EventMethodsCacheTest.class, WebSocketEventDriverTest.class, AcceptHashTest.class, - ClosePayloadParserTest.class, ParserTest.class, PingPayloadParserTest.class, RFC6455ExamplesGeneratorTest.class, RFC6455ExamplesParserTest.class, - TextPayloadParserTest.class, WebSocketFrameTest.class, GeneratorParserRoundtripTest.class }) +{ org.eclipse.jetty.websocket.ab.AllTests.class, EventMethodsCacheTest.class, WebSocketEventDriverTest.class, + org.eclipse.jetty.websocket.protocol.AllTests.class, GeneratorParserRoundtripTest.class }) public class AllTests { /* nothing to do here */ diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/protocol/AllTests.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/protocol/AllTests.java new file mode 100644 index 00000000000..facf087b66a --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/protocol/AllTests.java @@ -0,0 +1,13 @@ +package org.eclipse.jetty.websocket.protocol; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses( +{ AcceptHashTest.class, ClosePayloadParserTest.class, GeneratorTest.class, ParserTest.class, PingPayloadParserTest.class, RFC6455ExamplesGeneratorTest.class, + RFC6455ExamplesParserTest.class, TextPayloadParserTest.class, WebSocketFrameTest.class }) +public class AllTests +{ + /* allow junit annotations to do the heavy lifting */ +} diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/protocol/GeneratorTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/protocol/GeneratorTest.java new file mode 100644 index 00000000000..7e720679a2c --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/protocol/GeneratorTest.java @@ -0,0 +1,115 @@ +package org.eclipse.jetty.websocket.protocol; + +import static org.hamcrest.Matchers.*; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.junit.Assert; +import org.junit.Test; + +public class GeneratorTest +{ + @Test + public void testWindowedGenerate() + { + byte payload[] = new byte[10240]; + Arrays.fill(payload,(byte)0x44); + + WebSocketFrame frame = WebSocketFrame.binary(payload); + + int totalParts = 0; + int totalBytes = 0; + int windowSize = 1024; + int expectedHeaderSize = 4; + int expectedParts = (int)Math.ceil((double)(payload.length + expectedHeaderSize) / windowSize); + + Generator generator = new UnitGenerator(); + + boolean done = false; + while (!done) + { + Assert.assertThat("Too many parts",totalParts,lessThan(20)); + + ByteBuffer buf = generator.generate(windowSize,frame); + // System.out.printf("Generated buf.limit() = %,d%n",buf.limit()); + + totalBytes += buf.remaining(); + totalParts++; + + done = (frame.remaining() <= 0); + } + + Assert.assertThat("Created Parts",totalParts,is(expectedParts)); + Assert.assertThat("Created Bytes",totalBytes,is(payload.length + expectedHeaderSize)); + } + + @Test + public void testWindowedGenerateWithMasking() + { + byte payload[] = new byte[10240]; + Arrays.fill(payload,(byte)0x55); + + byte mask[] = new byte[] + { 0x2A, (byte)0xF0, 0x0F, 0x00 }; + + WebSocketFrame frame = WebSocketFrame.binary(payload); + frame.setMask(mask); + + int totalParts = 0; + int totalBytes = 0; + int windowSize = 2929; // important, use an odd # window size to test masking across window barriers + int expectedHeaderSize = 8; + int expectedParts = (int)Math.ceil((double)(payload.length + expectedHeaderSize) / windowSize); + + // Buffer to capture generated bytes + ByteBuffer completeBuf = ByteBuffer.allocate(payload.length + expectedHeaderSize); + BufferUtil.clearToFill(completeBuf); + + // Generate and capture generator output + Generator generator = new UnitGenerator(); + + boolean done = false; + while (!done) + { + Assert.assertThat("Too many parts",totalParts,lessThan(20)); + + ByteBuffer buf = generator.generate(windowSize,frame); + // System.out.printf("Generated buf.limit() = %,d%n",buf.limit()); + + totalBytes += buf.remaining(); + totalParts++; + + BufferUtil.put(buf,completeBuf); + + done = (frame.remaining() <= 0); + } + + Assert.assertThat("Created Parts",totalParts,is(expectedParts)); + Assert.assertThat("Created Bytes",totalBytes,is(payload.length + expectedHeaderSize)); + + // Parse complete buffer. + WebSocketPolicy policy = WebSocketPolicy.newServerPolicy(); + Parser parser = new Parser(policy); + FrameParseCapture capture = new FrameParseCapture(); + parser.setIncomingFramesHandler(capture); + + BufferUtil.flipToFlush(completeBuf,0); + parser.parse(completeBuf); + + // Assert validity of frame + WebSocketFrame actual = capture.getFrames().get(0); + Assert.assertThat("Frame.opcode",actual.getOpCode(),is(OpCode.BINARY)); + Assert.assertThat("Frame.payloadLength",actual.getPayloadLength(),is(payload.length)); + + // Validate payload content for proper masking + ByteBuffer actualData = actual.getPayload().slice(); + Assert.assertThat("Frame.payload.remaining",actualData.remaining(),is(payload.length)); + while (actualData.remaining() > 0) + { + Assert.assertThat("Actual.payload[" + actualData.position() + "]",actualData.get(),is((byte)0x55)); + } + } +} diff --git a/jetty-xml/src/main/java/org/eclipse/jetty/xml/XmlConfiguration.java b/jetty-xml/src/main/java/org/eclipse/jetty/xml/XmlConfiguration.java index 71164417c7c..5666cab2d74 100644 --- a/jetty-xml/src/main/java/org/eclipse/jetty/xml/XmlConfiguration.java +++ b/jetty-xml/src/main/java/org/eclipse/jetty/xml/XmlConfiguration.java @@ -120,7 +120,7 @@ public class XmlConfiguration __parser.redirectEntity("configure_6_0.dtd",config60); __parser.redirectEntity("configure_7_6.dtd",config76); - + __parser.redirectEntity("http://jetty.mortbay.org/configure.dtd",config76); __parser.redirectEntity("http://jetty.eclipse.org/configure.dtd",config76); __parser.redirectEntity("http://www.eclipse.org/jetty/configure.dtd",config76); @@ -303,7 +303,7 @@ public class XmlConfiguration public void init(URL url, XmlParser.Node config, Map idMap, Map properties) { - _url=url.toString(); + _url=url==null?null:url.toString(); _config=config; _idMap=idMap; _propertyMap=properties; @@ -927,7 +927,7 @@ public class XmlConfiguration configure(prop,node,0); return prop; } - + /* ------------------------------------------------------------ */ /* @@ -1097,7 +1097,7 @@ public class XmlConfiguration String defaultValue = node.getAttribute("default"); return System.getProperty(name,defaultValue); } - + if ("Env".equals(tag)) { String name = node.getAttribute("name"); diff --git a/jetty-xml/src/test/java/org/eclipse/jetty/xml/XmlConfigurationTest.java b/jetty-xml/src/test/java/org/eclipse/jetty/xml/XmlConfigurationTest.java index 16965d4201f..f8f02d071a3 100644 --- a/jetty-xml/src/test/java/org/eclipse/jetty/xml/XmlConfigurationTest.java +++ b/jetty-xml/src/test/java/org/eclipse/jetty/xml/XmlConfigurationTest.java @@ -13,15 +13,15 @@ package org.eclipse.jetty.xml; -import static junit.framework.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.net.URL; import java.util.HashMap; import java.util.Map; import org.junit.Test; +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class XmlConfigurationTest { protected String _configure="org/eclipse/jetty/xml/configure.xml"; @@ -31,14 +31,14 @@ public class XmlConfigurationTest { URL url = XmlConfigurationTest.class.getClassLoader().getResource("org/eclipse/jetty/xml/mortbay.xml"); XmlConfiguration configuration = new XmlConfiguration(url); - Object o=configuration.configure(); + configuration.configure(); } - + @Test public void testPassedObject() throws Exception { TestConfiguration.VALUE=77; - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("whatever", "xxx"); URL url = XmlConfigurationTest.class.getClassLoader().getResource(_configure); @@ -49,30 +49,30 @@ public class XmlConfigurationTest assertEquals("Set String","SetValue",tc.testObject); assertEquals("Set Type",2,tc.testInt); - + assertEquals(18080, tc.propValue); assertEquals("Put","PutValue",tc.get("Test")); assertEquals("Put dft","2",tc.get("TestDft")); - assertEquals("Put type",new Integer(2),tc.get("TestInt")); + assertEquals("Put type",2,tc.get("TestInt")); assertEquals("Trim","PutValue",tc.get("Trim")); assertEquals("Null",null,tc.get("Null")); assertEquals("NullTrim",null,tc.get("NullTrim")); - assertEquals("ObjectTrim",new Double(1.2345),tc.get("ObjectTrim")); + assertEquals("ObjectTrim",1.2345,tc.get("ObjectTrim")); assertEquals("Objects","-1String",tc.get("Objects")); assertEquals( "ObjectsTrim", "-1String",tc.get("ObjectsTrim")); assertEquals( "String", "\n PutValue\n ",tc.get("String")); assertEquals( "NullString", "",tc.get("NullString")); - assertEquals( "WhateSpace", "\n ",tc.get("WhiteSpace")); + assertEquals( "WhiteSpace", "\n ",tc.get("WhiteSpace")); assertEquals( "ObjectString", "\n 1.2345\n ",tc.get("ObjectString")); assertEquals( "ObjectsString", "-1String",tc.get("ObjectsString")); assertEquals( "ObjectsWhiteString", "-1\n String",tc.get("ObjectsWhiteString")); assertEquals( "SystemProperty", System.getProperty("user.dir")+"/stuff",tc.get("SystemProperty")); assertEquals( "Env", System.getenv("HOME"),tc.get("Env")); - + assertEquals( "Property", "xxx", tc.get("Property")); @@ -82,7 +82,7 @@ public class XmlConfigurationTest assertEquals("oa[0]","Blah",tc.oa[0]); assertEquals("oa[1]","1.2.3.4:5678",tc.oa[1]); - assertEquals("oa[2]",new Double(1.2345),tc.oa[2]); + assertEquals("oa[2]",1.2345,tc.oa[2]); assertEquals("oa[3]",null,tc.oa[3]); assertEquals("ia[0]",1,tc.ia[0]); @@ -92,25 +92,25 @@ public class XmlConfigurationTest TestConfiguration tc2=tc.nested; assertTrue(tc2!=null); - assertEquals( "Called(bool)", new Boolean(true),tc2.get("Arg")); + assertEquals( "Called(bool)",true,tc2.get("Arg")); assertEquals("nested config",null,tc.get("Arg")); - assertEquals("nested config",new Boolean(true),tc2.get("Arg")); + assertEquals("nested config",true,tc2.get("Arg")); assertEquals("nested config","Call1",tc2.testObject); assertEquals("nested config",4,tc2.testInt); assertEquals( "nested call", "http://www.eclipse.com/",tc2.url.toString()); - + assertEquals("static to field",tc.testField1,77); assertEquals("field to field",tc.testField2,2); assertEquals("literal to static",TestConfiguration.VALUE,42); } - + @Test public void testNewObject() throws Exception { TestConfiguration.VALUE=71; - Map properties = new HashMap(); + Map properties = new HashMap<>(); properties.put("whatever", "xxx"); URL url = XmlConfigurationTest.class.getClassLoader().getResource(_configure); @@ -120,23 +120,23 @@ public class XmlConfigurationTest assertEquals("Set String","SetValue",tc.testObject); assertEquals("Set Type",2,tc.testInt); - + assertEquals(18080, tc.propValue); assertEquals("Put","PutValue",tc.get("Test")); assertEquals("Put dft","2",tc.get("TestDft")); - assertEquals("Put type",new Integer(2),tc.get("TestInt")); + assertEquals("Put type",2,tc.get("TestInt")); assertEquals("Trim","PutValue",tc.get("Trim")); assertEquals("Null",null,tc.get("Null")); assertEquals("NullTrim",null,tc.get("NullTrim")); - assertEquals("ObjectTrim",new Double(1.2345),tc.get("ObjectTrim")); + assertEquals("ObjectTrim",1.2345,tc.get("ObjectTrim")); assertEquals("Objects","-1String",tc.get("Objects")); assertEquals( "ObjectsTrim", "-1String",tc.get("ObjectsTrim")); assertEquals( "String", "\n PutValue\n ",tc.get("String")); assertEquals( "NullString", "",tc.get("NullString")); - assertEquals( "WhateSpace", "\n ",tc.get("WhiteSpace")); + assertEquals( "WhiteSpace", "\n ",tc.get("WhiteSpace")); assertEquals( "ObjectString", "\n 1.2345\n ",tc.get("ObjectString")); assertEquals( "ObjectsString", "-1String",tc.get("ObjectsString")); assertEquals( "ObjectsWhiteString", "-1\n String",tc.get("ObjectsWhiteString")); @@ -151,7 +151,7 @@ public class XmlConfigurationTest assertEquals("oa[0]","Blah",tc.oa[0]); assertEquals("oa[1]","1.2.3.4:5678",tc.oa[1]); - assertEquals("oa[2]",new Double(1.2345),tc.oa[2]); + assertEquals("oa[2]",1.2345,tc.oa[2]); assertEquals("oa[3]",null,tc.oa[3]); assertEquals("ia[0]",1,tc.ia[0]); @@ -161,21 +161,21 @@ public class XmlConfigurationTest TestConfiguration tc2=tc.nested; assertTrue(tc2!=null); - assertEquals( "Called(bool)", new Boolean(true),tc2.get("Arg")); + assertEquals( "Called(bool)",true,tc2.get("Arg")); assertEquals("nested config",null,tc.get("Arg")); - assertEquals("nested config",new Boolean(true),tc2.get("Arg")); + assertEquals("nested config",true,tc2.get("Arg")); assertEquals("nested config","Call1",tc2.testObject); assertEquals("nested config",4,tc2.testInt); assertEquals( "nested call", "http://www.eclipse.com/",tc2.url.toString()); - + assertEquals("static to field",71,tc.testField1); assertEquals("field to field",2,tc.testField2); assertEquals("literal to static",42,TestConfiguration.VALUE); } - - + + @Test public void testStringConfiguration() throws Exception {