diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 667ed566a1f..7d51c2e6aeb 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -25,8 +25,11 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -303,7 +306,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa */ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable { - private final ConcurrentLinkedQueue _changes = new ConcurrentLinkedQueue<>(); + private final Queue _changes = new ConcurrentLinkedQueue<>(); + private final Set _endPoints = Collections.newSetFromMap(new ConcurrentHashMap()); private final int _id; private Selector _selector; private Thread _thread; @@ -342,7 +346,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { if (Thread.currentThread() != _thread) { - _changes.add(change); + _changes.offer(change); LOG.debug("Queued change {}", change); boolean wakeup = _needsWakeup; if (wakeup) @@ -362,9 +366,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { Runnable change; while ((change = _changes.poll()) != null) - { runChange(change); - } } protected void runChange(Runnable change) @@ -531,9 +533,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException { AsyncEndPoint endPoint = newEndPoint(channel, this, selectionKey); + _endPoints.add(endPoint); endPointOpened(endPoint); - endPoint.setAsyncConnection(newConnection(channel, endPoint, selectionKey.attachment())); - endPoint.getAsyncConnection().onOpen(); + AsyncConnection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment()); + endPoint.setAsyncConnection(asyncConnection); + asyncConnection.onOpen(); LOG.debug("Created {}", endPoint); return endPoint; } @@ -541,6 +545,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa public void destroyEndPoint(AsyncEndPoint endPoint) { LOG.debug("Destroyed {}", endPoint); + _endPoints.remove(endPoint); endPoint.getAsyncConnection().onClose(); endPointClosed(endPoint); } @@ -607,13 +612,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private void timeoutCheck() { + // We cannot use the _selector.keys() because the returned Set is not thread + // safe so it may be modified by the selector thread while we iterate here. long now = System.currentTimeMillis(); - for (SelectionKey key : _selector.keys()) - { - Object attachment = key.attachment(); - if (attachment instanceof AsyncEndPoint) - ((AsyncEndPoint)attachment).checkTimeout(now); - } + for (AsyncEndPoint endPoint : _endPoints) + endPoint.checkTimeout(now); } private class DumpKeys implements Runnable diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java index aa08ae31a1c..6e156081da4 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java @@ -13,11 +13,6 @@ package org.eclipse.jetty.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -30,135 +25,158 @@ import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.Future; - -import junit.framework.Assert; +import java.util.concurrent.TimeUnit; import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IO; +import org.junit.Assert; import org.junit.Test; -/** - * - */ +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class IOTest { @Test public void testIO() throws InterruptedException { // Only a little test - ByteArrayInputStream in = new ByteArrayInputStream - ("The quick brown fox jumped over the lazy dog".getBytes()); + ByteArrayInputStream in = new ByteArrayInputStream("The quick brown fox jumped over the lazy dog".getBytes()); ByteArrayOutputStream out = new ByteArrayOutputStream(); - IO.copyThread(in,out); + IO.copyThread(in, out); Thread.sleep(1500); // System.err.println(out); - assertEquals( "copyThread", - out.toString(), - "The quick brown fox jumped over the lazy dog"); + assertEquals("copyThread", out.toString(), "The quick brown fox jumped over the lazy dog"); } - + @Test public void testHalfClose() throws Exception { ServerSocket connector = new ServerSocket(0); - - Socket client = new Socket("localhost",connector.getLocalPort()); + + Socket client = new Socket("localhost", connector.getLocalPort()); Socket server = connector.accept(); - + // we can write both ways client.getOutputStream().write(1); - assertEquals(1,server.getInputStream().read()); + assertEquals(1, server.getInputStream().read()); server.getOutputStream().write(1); - assertEquals(1,client.getInputStream().read()); - + assertEquals(1, client.getInputStream().read()); + // shutdown output results in read -1 client.shutdownOutput(); - assertEquals(-1,server.getInputStream().read()); - - // Even though EOF has been read, the server input is not seen as shutdown + assertEquals(-1, server.getInputStream().read()); + + // Even though EOF has been read, the server input is not seen as shutdown assertFalse(server.isInputShutdown()); - + // and we can read -1 again - assertEquals(-1,server.getInputStream().read()); + assertEquals(-1, server.getInputStream().read()); // but cannot write - try { client.getOutputStream().write(1); fail("exception expected"); } catch (SocketException e) {} - - // but can still write in opposite direction. - server.getOutputStream().write(1); - assertEquals(1,client.getInputStream().read()); - - - // server can shutdown input to match the shutdown out of client - server.shutdownInput(); - - // now we EOF instead of reading -1 - try { server.getInputStream().read(); fail("exception expected"); } catch (SocketException e) {} - + try + { + client.getOutputStream().write(1); + fail("exception expected"); + } + catch (SocketException e) + { + } // but can still write in opposite direction. server.getOutputStream().write(1); - assertEquals(1,client.getInputStream().read()); - + assertEquals(1, client.getInputStream().read()); + + // server can shutdown input to match the shutdown out of client + server.shutdownInput(); + + // now we EOF instead of reading -1 + try + { + server.getInputStream().read(); + fail("exception expected"); + } + catch (SocketException e) + { + } + + // but can still write in opposite direction. + server.getOutputStream().write(1); + assertEquals(1, client.getInputStream().read()); + // client can shutdown input client.shutdownInput(); // now we EOF instead of reading -1 - try { client.getInputStream().read(); fail("exception expected"); } catch (SocketException e) {} - - // But we can still write at the server (data which will never be read) + try + { + client.getInputStream().read(); + fail("exception expected"); + } + catch (SocketException e) + { + } + + // But we can still write at the server (data which will never be read) server.getOutputStream().write(1); - + // and the server output is not shutdown - assertFalse( server.isOutputShutdown() ); - + assertFalse(server.isOutputShutdown()); + // until we explictly shut it down server.shutdownOutput(); - + // and now we can't write - try { server.getOutputStream().write(1); fail("exception expected"); } catch (SocketException e) {} - + try + { + server.getOutputStream().write(1); + fail("exception expected"); + } + catch (SocketException e) + { + } + // but the sockets are still open assertFalse(client.isClosed()); assertFalse(server.isClosed()); - + // but if we close one end client.close(); // it is seen as closed. assertTrue(client.isClosed()); - + // but not the other end assertFalse(server.isClosed()); - + // which has to be closed explictly server.close(); assertTrue(server.isClosed()); - } - @Test public void testHalfCloseClientServer() throws Exception { ServerSocketChannel connector = ServerSocketChannel.open(); connector.socket().bind(null); - + Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket(); client.setSoTimeout(1000); - client.setSoLinger(false,-1); + client.setSoLinger(false, -1); Socket server = connector.accept().socket(); server.setSoTimeout(1000); - server.setSoLinger(false,-1); - + server.setSoLinger(false, -1); + // Write from client to server client.getOutputStream().write(1); - - // Server reads - assertEquals(1,server.getInputStream().read()); + + // Server reads + assertEquals(1, server.getInputStream().read()); // Write from server to client with oshut server.getOutputStream().write(1); @@ -166,12 +184,12 @@ public class IOTest server.shutdownOutput(); // Client reads response - assertEquals(1,client.getInputStream().read()); + assertEquals(1, client.getInputStream().read()); try { // Client reads -1 and does ishut - assertEquals(-1,client.getInputStream().read()); + assertEquals(-1, client.getInputStream().read()); assertFalse(client.isInputShutdown()); //System.err.println("ISHUT "+client); client.shutdownInput(); @@ -183,7 +201,7 @@ public class IOTest client.close(); // Server reads -1, does ishut and then close - assertEquals(-1,server.getInputStream().read()); + assertEquals(-1, server.getInputStream().read()); assertFalse(server.isInputShutdown()); //System.err.println("ISHUT "+server); @@ -191,7 +209,7 @@ public class IOTest { server.shutdownInput(); } - catch(SocketException e) + catch (SocketException e) { // System.err.println(e); } @@ -199,7 +217,7 @@ public class IOTest server.close(); } - catch(Exception e) + catch (Exception e) { System.err.println(e); assertTrue(OS.IS_OSX); @@ -211,19 +229,19 @@ public class IOTest { ServerSocketChannel connector = ServerSocketChannel.open(); connector.socket().bind(null); - + Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket(); client.setSoTimeout(1000); - client.setSoLinger(false,-1); + client.setSoLinger(false, -1); Socket server = connector.accept().socket(); server.setSoTimeout(1000); - server.setSoLinger(false,-1); - + server.setSoLinger(false, -1); + // Write from client to server client.getOutputStream().write(1); - - // Server reads - assertEquals(1,server.getInputStream().read()); + + // Server reads + assertEquals(1, server.getInputStream().read()); // Write from server to client with oshut server.getOutputStream().write(1); @@ -233,39 +251,39 @@ public class IOTest try { // Client reads response - assertEquals(1,client.getInputStream().read()); + assertEquals(1, client.getInputStream().read()); - // Client reads -1 - assertEquals(-1,client.getInputStream().read()); + // Client reads -1 + assertEquals(-1, client.getInputStream().read()); assertFalse(client.isInputShutdown()); // Client can still write as we are half closed client.getOutputStream().write(1); - // Server can still read - assertEquals(1,server.getInputStream().read()); + // Server can still read + assertEquals(1, server.getInputStream().read()); - // Server now closes + // Server now closes server.close(); // Client still reads -1 (not broken pipe !!) - assertEquals(-1,client.getInputStream().read()); + assertEquals(-1, client.getInputStream().read()); assertFalse(client.isInputShutdown()); Thread.sleep(100); // Client still reads -1 (not broken pipe !!) - assertEquals(-1,client.getInputStream().read()); + assertEquals(-1, client.getInputStream().read()); assertFalse(client.isInputShutdown()); // Client can still write data even though server is closed??? client.getOutputStream().write(1); // Client eventually sees Broken Pipe - int i=0; + int i = 0; try { - for (i=0;i<100000;i++) + for (i = 0; i < 100000; i++) client.getOutputStream().write(1); Assert.fail(); @@ -289,62 +307,64 @@ public class IOTest ServerSocket connector; Socket client; Socket server; - + connector = new ServerSocket(0); - client = new Socket("127.0.0.1",connector.getLocalPort()); + client = new Socket("127.0.0.1", connector.getLocalPort()); server = connector.accept(); client.setTcpNoDelay(true); - client.setSoLinger(true,0); + client.setSoLinger(true, 0); server.setTcpNoDelay(true); - server.setSoLinger(true,0); - + server.setSoLinger(true, 0); + client.getOutputStream().write(1); - assertEquals(1,server.getInputStream().read()); + assertEquals(1, server.getInputStream().read()); server.getOutputStream().write(1); - assertEquals(1,client.getInputStream().read()); - + assertEquals(1, client.getInputStream().read()); + // Server generator shutdowns output after non persistent sending response. server.shutdownOutput(); - + // client endpoint reads EOF and shutdown input as result - assertEquals(-1,client.getInputStream().read()); + assertEquals(-1, client.getInputStream().read()); client.shutdownInput(); - + // client connection see's EOF and shutsdown output as no more requests to be sent. client.shutdownOutput(); - + // Since input already shutdown, client also closes socket. client.close(); - + // Server reads the EOF from client oshut and shut's down it's input - assertEquals(-1,server.getInputStream().read()); + assertEquals(-1, server.getInputStream().read()); server.shutdownInput(); - + // Since output was already shutdown, server closes server.close(); } - + @Test public void testAsyncSocketChannel() throws Exception { AsynchronousServerSocketChannel connector = AsynchronousServerSocketChannel.open(); connector.bind(null); - Future acceptor= connector.accept(); - + Future acceptor = connector.accept(); + AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); - client.connect(connector.getLocalAddress()); - - AsynchronousSocketChannel server = acceptor.get(); - + client.connect(connector.getLocalAddress()).get(5, TimeUnit.SECONDS); + + AsynchronousSocketChannel server = acceptor.get(5, TimeUnit.SECONDS); + ByteBuffer read = ByteBuffer.allocate(1024); - Future reading=server.read(read); - - ByteBuffer write= BufferUtil.toBuffer("Testing 1 2 3"); - Future writing=client.write(write); - - reading.get(); - writing.get(); + Future reading = server.read(read); + + byte[] data = "Testing 1 2 3".getBytes("UTF-8"); + ByteBuffer write = BufferUtil.toBuffer(data); + Future writing = client.write(write); + + writing.get(5, TimeUnit.SECONDS); + reading.get(5, TimeUnit.SECONDS); read.flip(); - + + Assert.assertEquals(ByteBuffer.wrap(data), read); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java index 48c9d95193d..dd98ec86ad5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java @@ -34,20 +34,17 @@ public class LocalHttpConnector extends HttpConnector private final BlockingQueue _connects = new LinkedBlockingQueue(); private LocalExecutor _executor; - /* ------------------------------------------------------------ */ public LocalHttpConnector() { setIdleTimeout(30000); } - /* ------------------------------------------------------------ */ @Override public Object getTransport() { return this; } - /* ------------------------------------------------------------ */ /** Sends requests and get's responses based on thread activity. * Returns all the responses received once the thread activity has * returned to the level it was before the requests. @@ -61,7 +58,6 @@ public class LocalHttpConnector extends HttpConnector return result==null?null:BufferUtil.toString(result,StringUtil.__UTF8_CHARSET); } - /* ------------------------------------------------------------ */ /** Sends requests and get's responses based on thread activity. * Returns all the responses received once the thread activity has * returned to the level it was before the requests. @@ -78,7 +74,6 @@ public class LocalHttpConnector extends HttpConnector return request.takeOutput(); } - /* ------------------------------------------------------------ */ /** * Execute a request and return the EndPoint through which * responses can be received. @@ -88,14 +83,13 @@ public class LocalHttpConnector extends HttpConnector public LocalEndPoint executeRequest(String rawRequest) { Phaser phaser=_executor._phaser; - int phase = phaser.register(); // the corresponding arrival will be done by the acceptor thread when it takes + phaser.register(); // the corresponding arrival will be done by the acceptor thread when it takes LocalEndPoint endp = new LocalEndPoint(); endp.setInput(BufferUtil.toBuffer(rawRequest,StringUtil.__UTF8_CHARSET)); _connects.add(endp); return endp; } - /* ------------------------------------------------------------ */ @Override protected void accept(int acceptorID) throws IOException, InterruptedException { @@ -104,11 +98,11 @@ public class LocalHttpConnector extends HttpConnector HttpConnection connection=new HttpConnection(this,endp,getServer()); endp.setAsyncConnection(connection); endp.onOpen(); + connection.onOpen(); connectionOpened(connection); _executor._phaser.arriveAndDeregister(); // arrive for the register done in getResponses } - /* ------------------------------------------------------------ */ @Override protected void doStart() throws Exception { @@ -116,7 +110,6 @@ public class LocalHttpConnector extends HttpConnector _executor=new LocalExecutor(findExecutor()); } - /* ------------------------------------------------------------ */ @Override protected void doStop() throws Exception { @@ -124,28 +117,25 @@ public class LocalHttpConnector extends HttpConnector _executor=null; } - /* ------------------------------------------------------------ */ @Override public Executor findExecutor() { return _executor==null?super.findExecutor():_executor; } - /* ------------------------------------------------------------ */ - class LocalExecutor implements Executor + private class LocalExecutor implements Executor { - Phaser _phaser=new Phaser() + private final Phaser _phaser=new Phaser() { - @Override protected boolean onAdvance(int phase, int registeredParties) { return false; } - }; - final Executor _executor; - LocalExecutor(Executor e) + private final Executor _executor; + + private LocalExecutor(Executor e) { _executor=e; } @@ -173,7 +163,6 @@ public class LocalHttpConnector extends HttpConnector } } - /* ------------------------------------------------------------ */ public class LocalEndPoint extends AsyncByteArrayEndPoint { private CountDownLatch _closed = new CountDownLatch(1); @@ -184,7 +173,6 @@ public class LocalHttpConnector extends HttpConnector setIdleTimeout(LocalHttpConnector.this.getIdleTimeout()); } - /* ------------------------------------------------------------ */ public void addInput(String s) { // TODO this is a busy wait @@ -193,7 +181,6 @@ public class LocalHttpConnector extends HttpConnector setInput(BufferUtil.toBuffer(s, StringUtil.__UTF8_CHARSET)); } - /* ------------------------------------------------------------ */ @Override public void close() { @@ -206,7 +193,6 @@ public class LocalHttpConnector extends HttpConnector } } - /* ------------------------------------------------------------ */ @Override public void onClose() { @@ -214,7 +200,6 @@ public class LocalHttpConnector extends HttpConnector _closed.countDown(); } - /* ------------------------------------------------------------ */ @Override public void shutdownOutput() { @@ -222,7 +207,6 @@ public class LocalHttpConnector extends HttpConnector close(); } - /* ------------------------------------------------------------ */ public void waitUntilClosed() { while (isOpen()) diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java index 49cc1d09466..9dff1d37b17 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/StatisticsHandlerTest.java @@ -13,17 +13,11 @@ package org.eclipse.jetty.server.handler; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -38,6 +32,11 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class StatisticsHandlerTest { private Server _server; @@ -71,7 +70,7 @@ public class StatisticsHandlerTest @Test public void testRequest() throws Exception { - final CyclicBarrier barrier[] = { new CyclicBarrier(2), new CyclicBarrier(2)}; + final CyclicBarrier barrier[] = {new CyclicBarrier(2), new CyclicBarrier(2)}; _statsHandler.setHandler(new AbstractHandler() {