diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java new file mode 100644 index 00000000000..94885e2798a --- /dev/null +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java @@ -0,0 +1,192 @@ +// ======================================================================== +// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.Assert; +import org.junit.Test; + +public class SelectChannelEndPointInterestsTest +{ + private QueuedThreadPool threadPool; + private ScheduledExecutorService scheduler; + private ServerSocketChannel connector; + private SelectorManager selectorManager; + + public void init(final Interested interested) throws Exception + { + threadPool = new QueuedThreadPool(); + threadPool.start(); + + scheduler = Executors.newSingleThreadScheduledExecutor(); + + connector = ServerSocketChannel.open(); + connector.bind(new InetSocketAddress("localhost", 0)); + + selectorManager = new SelectorManager() + { + @Override + protected void execute(Runnable task) + { + threadPool.execute(task); + } + + @Override + protected AsyncEndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + { + return new SelectChannelEndPoint(channel, selector, selectionKey, scheduler, 60000) + { + @Override + protected void onIncompleteFlush() + { + super.onIncompleteFlush(); + interested.onIncompleteFlush(); + } + }; + } + + @Override + public AsyncConnection newConnection(SocketChannel channel, final AsyncEndPoint endPoint, Object attachment) + { + return new AbstractAsyncConnection(endPoint, threadPool) + { + @Override + public void onFillable() + { + interested.onFillable(endPoint, this); + } + }; + } + }; + selectorManager.start(); + } + + @Test + public void testReadBlockedThenWriteBlockedThenReadableThenWritable() throws Exception + { + final AtomicInteger size = new AtomicInteger(1024 * 1024); + final AtomicReference failure = new AtomicReference<>(); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + final AtomicBoolean writeBlocked = new AtomicBoolean(); + init(new Interested() + { + @Override + public void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection) + { + ByteBuffer input = BufferUtil.allocate(2); + int read = fill(endPoint, input); + + if (read == 1) + { + byte b = input.get(); + if (b == 1) + { + connection.fillInterested(); + + ByteBuffer output = ByteBuffer.allocate(size.get()); + endPoint.write(null, new Callback.Empty<>(), output); + + latch1.countDown(); + } + else + { + latch2.countDown(); + } + } + else + { + failure.set(new Exception("Unexpectedly read " + read + " bytes")); + } + } + + @Override + public void onIncompleteFlush() + { + writeBlocked.set(true); + } + + private int fill(AsyncEndPoint endPoint, ByteBuffer buffer) + { + try + { + return endPoint.fill(buffer); + } + catch (IOException x) + { + failure.set(x); + return 0; + } + } + }); + + Socket client = new Socket(); + client.connect(connector.getLocalAddress()); + client.setSoTimeout(5000); + + SocketChannel server = connector.accept(); + server.configureBlocking(false); + selectorManager.accept(server); + + OutputStream clientOutput = client.getOutputStream(); + clientOutput.write(1); + clientOutput.flush(); + Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS)); + + // We do not read to keep the socket write blocked + + clientOutput.write(2); + clientOutput.flush(); + Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS)); + + // Sleep before reading to allow waking up the server only for read + Thread.sleep(1000); + + // Now read what was written, waking up the server for write + InputStream clientInput = client.getInputStream(); + while (size.getAndDecrement() > 0) + clientInput.read(); + + client.close(); + + Assert.assertNull(failure.get()); + } + + private interface Interested + { + void onFillable(AsyncEndPoint endPoint, AbstractAsyncConnection connection); + + void onIncompleteFlush(); + } + +}