Jetty9 - Test fixing.
This commit is contained in:
parent
851239ee56
commit
b08bca2abb
|
@ -25,8 +25,11 @@ import java.nio.channels.Selector;
|
||||||
import java.nio.channels.ServerSocketChannel;
|
import java.nio.channels.ServerSocketChannel;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -303,7 +306,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
||||||
*/
|
*/
|
||||||
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
|
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
|
||||||
{
|
{
|
||||||
private final ConcurrentLinkedQueue<Runnable> _changes = new ConcurrentLinkedQueue<>();
|
private final Queue<Runnable> _changes = new ConcurrentLinkedQueue<>();
|
||||||
|
private final Set<AsyncEndPoint> _endPoints = Collections.newSetFromMap(new ConcurrentHashMap<AsyncEndPoint, Boolean>());
|
||||||
private final int _id;
|
private final int _id;
|
||||||
private Selector _selector;
|
private Selector _selector;
|
||||||
private Thread _thread;
|
private Thread _thread;
|
||||||
|
@ -342,7 +346,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
||||||
{
|
{
|
||||||
if (Thread.currentThread() != _thread)
|
if (Thread.currentThread() != _thread)
|
||||||
{
|
{
|
||||||
_changes.add(change);
|
_changes.offer(change);
|
||||||
LOG.debug("Queued change {}", change);
|
LOG.debug("Queued change {}", change);
|
||||||
boolean wakeup = _needsWakeup;
|
boolean wakeup = _needsWakeup;
|
||||||
if (wakeup)
|
if (wakeup)
|
||||||
|
@ -362,9 +366,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
||||||
{
|
{
|
||||||
Runnable change;
|
Runnable change;
|
||||||
while ((change = _changes.poll()) != null)
|
while ((change = _changes.poll()) != null)
|
||||||
{
|
|
||||||
runChange(change);
|
runChange(change);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void runChange(Runnable change)
|
protected void runChange(Runnable change)
|
||||||
|
@ -531,9 +533,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
||||||
private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
|
private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
|
||||||
{
|
{
|
||||||
AsyncEndPoint endPoint = newEndPoint(channel, this, selectionKey);
|
AsyncEndPoint endPoint = newEndPoint(channel, this, selectionKey);
|
||||||
|
_endPoints.add(endPoint);
|
||||||
endPointOpened(endPoint);
|
endPointOpened(endPoint);
|
||||||
endPoint.setAsyncConnection(newConnection(channel, endPoint, selectionKey.attachment()));
|
AsyncConnection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment());
|
||||||
endPoint.getAsyncConnection().onOpen();
|
endPoint.setAsyncConnection(asyncConnection);
|
||||||
|
asyncConnection.onOpen();
|
||||||
LOG.debug("Created {}", endPoint);
|
LOG.debug("Created {}", endPoint);
|
||||||
return endPoint;
|
return endPoint;
|
||||||
}
|
}
|
||||||
|
@ -541,6 +545,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
||||||
public void destroyEndPoint(AsyncEndPoint endPoint)
|
public void destroyEndPoint(AsyncEndPoint endPoint)
|
||||||
{
|
{
|
||||||
LOG.debug("Destroyed {}", endPoint);
|
LOG.debug("Destroyed {}", endPoint);
|
||||||
|
_endPoints.remove(endPoint);
|
||||||
endPoint.getAsyncConnection().onClose();
|
endPoint.getAsyncConnection().onClose();
|
||||||
endPointClosed(endPoint);
|
endPointClosed(endPoint);
|
||||||
}
|
}
|
||||||
|
@ -607,13 +612,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
||||||
|
|
||||||
private void timeoutCheck()
|
private void timeoutCheck()
|
||||||
{
|
{
|
||||||
|
// We cannot use the _selector.keys() because the returned Set is not thread
|
||||||
|
// safe so it may be modified by the selector thread while we iterate here.
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
for (SelectionKey key : _selector.keys())
|
for (AsyncEndPoint endPoint : _endPoints)
|
||||||
{
|
endPoint.checkTimeout(now);
|
||||||
Object attachment = key.attachment();
|
|
||||||
if (attachment instanceof AsyncEndPoint)
|
|
||||||
((AsyncEndPoint)attachment).checkTimeout(now);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class DumpKeys implements Runnable
|
private class DumpKeys implements Runnable
|
||||||
|
|
|
@ -13,11 +13,6 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.io;
|
package org.eclipse.jetty.io;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -30,135 +25,158 @@ import java.nio.channels.AsynchronousSocketChannel;
|
||||||
import java.nio.channels.ServerSocketChannel;
|
import java.nio.channels.ServerSocketChannel;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import junit.framework.Assert;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.toolchain.test.OS;
|
import org.eclipse.jetty.toolchain.test.OS;
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
import org.eclipse.jetty.util.IO;
|
import org.eclipse.jetty.util.IO;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
import static org.junit.Assert.assertEquals;
|
||||||
*
|
import static org.junit.Assert.assertFalse;
|
||||||
*/
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class IOTest
|
public class IOTest
|
||||||
{
|
{
|
||||||
@Test
|
@Test
|
||||||
public void testIO() throws InterruptedException
|
public void testIO() throws InterruptedException
|
||||||
{
|
{
|
||||||
// Only a little test
|
// Only a little test
|
||||||
ByteArrayInputStream in = new ByteArrayInputStream
|
ByteArrayInputStream in = new ByteArrayInputStream("The quick brown fox jumped over the lazy dog".getBytes());
|
||||||
("The quick brown fox jumped over the lazy dog".getBytes());
|
|
||||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
|
||||||
IO.copyThread(in,out);
|
IO.copyThread(in, out);
|
||||||
Thread.sleep(1500);
|
Thread.sleep(1500);
|
||||||
// System.err.println(out);
|
// System.err.println(out);
|
||||||
|
|
||||||
assertEquals( "copyThread",
|
assertEquals("copyThread", out.toString(), "The quick brown fox jumped over the lazy dog");
|
||||||
out.toString(),
|
|
||||||
"The quick brown fox jumped over the lazy dog");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHalfClose() throws Exception
|
public void testHalfClose() throws Exception
|
||||||
{
|
{
|
||||||
ServerSocket connector = new ServerSocket(0);
|
ServerSocket connector = new ServerSocket(0);
|
||||||
|
|
||||||
Socket client = new Socket("localhost",connector.getLocalPort());
|
Socket client = new Socket("localhost", connector.getLocalPort());
|
||||||
Socket server = connector.accept();
|
Socket server = connector.accept();
|
||||||
|
|
||||||
// we can write both ways
|
// we can write both ways
|
||||||
client.getOutputStream().write(1);
|
client.getOutputStream().write(1);
|
||||||
assertEquals(1,server.getInputStream().read());
|
assertEquals(1, server.getInputStream().read());
|
||||||
server.getOutputStream().write(1);
|
server.getOutputStream().write(1);
|
||||||
assertEquals(1,client.getInputStream().read());
|
assertEquals(1, client.getInputStream().read());
|
||||||
|
|
||||||
// shutdown output results in read -1
|
// shutdown output results in read -1
|
||||||
client.shutdownOutput();
|
client.shutdownOutput();
|
||||||
assertEquals(-1,server.getInputStream().read());
|
assertEquals(-1, server.getInputStream().read());
|
||||||
|
|
||||||
// Even though EOF has been read, the server input is not seen as shutdown
|
// Even though EOF has been read, the server input is not seen as shutdown
|
||||||
assertFalse(server.isInputShutdown());
|
assertFalse(server.isInputShutdown());
|
||||||
|
|
||||||
// and we can read -1 again
|
// and we can read -1 again
|
||||||
assertEquals(-1,server.getInputStream().read());
|
assertEquals(-1, server.getInputStream().read());
|
||||||
|
|
||||||
// but cannot write
|
// but cannot write
|
||||||
try { client.getOutputStream().write(1); fail("exception expected"); } catch (SocketException e) {}
|
try
|
||||||
|
{
|
||||||
// but can still write in opposite direction.
|
client.getOutputStream().write(1);
|
||||||
server.getOutputStream().write(1);
|
fail("exception expected");
|
||||||
assertEquals(1,client.getInputStream().read());
|
}
|
||||||
|
catch (SocketException e)
|
||||||
|
{
|
||||||
// server can shutdown input to match the shutdown out of client
|
}
|
||||||
server.shutdownInput();
|
|
||||||
|
|
||||||
// now we EOF instead of reading -1
|
|
||||||
try { server.getInputStream().read(); fail("exception expected"); } catch (SocketException e) {}
|
|
||||||
|
|
||||||
|
|
||||||
// but can still write in opposite direction.
|
// but can still write in opposite direction.
|
||||||
server.getOutputStream().write(1);
|
server.getOutputStream().write(1);
|
||||||
assertEquals(1,client.getInputStream().read());
|
assertEquals(1, client.getInputStream().read());
|
||||||
|
|
||||||
|
// server can shutdown input to match the shutdown out of client
|
||||||
|
server.shutdownInput();
|
||||||
|
|
||||||
|
// now we EOF instead of reading -1
|
||||||
|
try
|
||||||
|
{
|
||||||
|
server.getInputStream().read();
|
||||||
|
fail("exception expected");
|
||||||
|
}
|
||||||
|
catch (SocketException e)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
// but can still write in opposite direction.
|
||||||
|
server.getOutputStream().write(1);
|
||||||
|
assertEquals(1, client.getInputStream().read());
|
||||||
|
|
||||||
// client can shutdown input
|
// client can shutdown input
|
||||||
client.shutdownInput();
|
client.shutdownInput();
|
||||||
|
|
||||||
// now we EOF instead of reading -1
|
// now we EOF instead of reading -1
|
||||||
try { client.getInputStream().read(); fail("exception expected"); } catch (SocketException e) {}
|
try
|
||||||
|
{
|
||||||
// But we can still write at the server (data which will never be read)
|
client.getInputStream().read();
|
||||||
|
fail("exception expected");
|
||||||
|
}
|
||||||
|
catch (SocketException e)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
// But we can still write at the server (data which will never be read)
|
||||||
server.getOutputStream().write(1);
|
server.getOutputStream().write(1);
|
||||||
|
|
||||||
// and the server output is not shutdown
|
// and the server output is not shutdown
|
||||||
assertFalse( server.isOutputShutdown() );
|
assertFalse(server.isOutputShutdown());
|
||||||
|
|
||||||
// until we explictly shut it down
|
// until we explictly shut it down
|
||||||
server.shutdownOutput();
|
server.shutdownOutput();
|
||||||
|
|
||||||
// and now we can't write
|
// and now we can't write
|
||||||
try { server.getOutputStream().write(1); fail("exception expected"); } catch (SocketException e) {}
|
try
|
||||||
|
{
|
||||||
|
server.getOutputStream().write(1);
|
||||||
|
fail("exception expected");
|
||||||
|
}
|
||||||
|
catch (SocketException e)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
// but the sockets are still open
|
// but the sockets are still open
|
||||||
assertFalse(client.isClosed());
|
assertFalse(client.isClosed());
|
||||||
assertFalse(server.isClosed());
|
assertFalse(server.isClosed());
|
||||||
|
|
||||||
// but if we close one end
|
// but if we close one end
|
||||||
client.close();
|
client.close();
|
||||||
|
|
||||||
// it is seen as closed.
|
// it is seen as closed.
|
||||||
assertTrue(client.isClosed());
|
assertTrue(client.isClosed());
|
||||||
|
|
||||||
// but not the other end
|
// but not the other end
|
||||||
assertFalse(server.isClosed());
|
assertFalse(server.isClosed());
|
||||||
|
|
||||||
// which has to be closed explictly
|
// which has to be closed explictly
|
||||||
server.close();
|
server.close();
|
||||||
assertTrue(server.isClosed());
|
assertTrue(server.isClosed());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHalfCloseClientServer() throws Exception
|
public void testHalfCloseClientServer() throws Exception
|
||||||
{
|
{
|
||||||
ServerSocketChannel connector = ServerSocketChannel.open();
|
ServerSocketChannel connector = ServerSocketChannel.open();
|
||||||
connector.socket().bind(null);
|
connector.socket().bind(null);
|
||||||
|
|
||||||
Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket();
|
Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket();
|
||||||
client.setSoTimeout(1000);
|
client.setSoTimeout(1000);
|
||||||
client.setSoLinger(false,-1);
|
client.setSoLinger(false, -1);
|
||||||
Socket server = connector.accept().socket();
|
Socket server = connector.accept().socket();
|
||||||
server.setSoTimeout(1000);
|
server.setSoTimeout(1000);
|
||||||
server.setSoLinger(false,-1);
|
server.setSoLinger(false, -1);
|
||||||
|
|
||||||
// Write from client to server
|
// Write from client to server
|
||||||
client.getOutputStream().write(1);
|
client.getOutputStream().write(1);
|
||||||
|
|
||||||
// Server reads
|
// Server reads
|
||||||
assertEquals(1,server.getInputStream().read());
|
assertEquals(1, server.getInputStream().read());
|
||||||
|
|
||||||
// Write from server to client with oshut
|
// Write from server to client with oshut
|
||||||
server.getOutputStream().write(1);
|
server.getOutputStream().write(1);
|
||||||
|
@ -166,12 +184,12 @@ public class IOTest
|
||||||
server.shutdownOutput();
|
server.shutdownOutput();
|
||||||
|
|
||||||
// Client reads response
|
// Client reads response
|
||||||
assertEquals(1,client.getInputStream().read());
|
assertEquals(1, client.getInputStream().read());
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Client reads -1 and does ishut
|
// Client reads -1 and does ishut
|
||||||
assertEquals(-1,client.getInputStream().read());
|
assertEquals(-1, client.getInputStream().read());
|
||||||
assertFalse(client.isInputShutdown());
|
assertFalse(client.isInputShutdown());
|
||||||
//System.err.println("ISHUT "+client);
|
//System.err.println("ISHUT "+client);
|
||||||
client.shutdownInput();
|
client.shutdownInput();
|
||||||
|
@ -183,7 +201,7 @@ public class IOTest
|
||||||
client.close();
|
client.close();
|
||||||
|
|
||||||
// Server reads -1, does ishut and then close
|
// Server reads -1, does ishut and then close
|
||||||
assertEquals(-1,server.getInputStream().read());
|
assertEquals(-1, server.getInputStream().read());
|
||||||
assertFalse(server.isInputShutdown());
|
assertFalse(server.isInputShutdown());
|
||||||
//System.err.println("ISHUT "+server);
|
//System.err.println("ISHUT "+server);
|
||||||
|
|
||||||
|
@ -191,7 +209,7 @@ public class IOTest
|
||||||
{
|
{
|
||||||
server.shutdownInput();
|
server.shutdownInput();
|
||||||
}
|
}
|
||||||
catch(SocketException e)
|
catch (SocketException e)
|
||||||
{
|
{
|
||||||
// System.err.println(e);
|
// System.err.println(e);
|
||||||
}
|
}
|
||||||
|
@ -199,7 +217,7 @@ public class IOTest
|
||||||
server.close();
|
server.close();
|
||||||
|
|
||||||
}
|
}
|
||||||
catch(Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
System.err.println(e);
|
System.err.println(e);
|
||||||
assertTrue(OS.IS_OSX);
|
assertTrue(OS.IS_OSX);
|
||||||
|
@ -211,19 +229,19 @@ public class IOTest
|
||||||
{
|
{
|
||||||
ServerSocketChannel connector = ServerSocketChannel.open();
|
ServerSocketChannel connector = ServerSocketChannel.open();
|
||||||
connector.socket().bind(null);
|
connector.socket().bind(null);
|
||||||
|
|
||||||
Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket();
|
Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket();
|
||||||
client.setSoTimeout(1000);
|
client.setSoTimeout(1000);
|
||||||
client.setSoLinger(false,-1);
|
client.setSoLinger(false, -1);
|
||||||
Socket server = connector.accept().socket();
|
Socket server = connector.accept().socket();
|
||||||
server.setSoTimeout(1000);
|
server.setSoTimeout(1000);
|
||||||
server.setSoLinger(false,-1);
|
server.setSoLinger(false, -1);
|
||||||
|
|
||||||
// Write from client to server
|
// Write from client to server
|
||||||
client.getOutputStream().write(1);
|
client.getOutputStream().write(1);
|
||||||
|
|
||||||
// Server reads
|
// Server reads
|
||||||
assertEquals(1,server.getInputStream().read());
|
assertEquals(1, server.getInputStream().read());
|
||||||
|
|
||||||
// Write from server to client with oshut
|
// Write from server to client with oshut
|
||||||
server.getOutputStream().write(1);
|
server.getOutputStream().write(1);
|
||||||
|
@ -233,39 +251,39 @@ public class IOTest
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// Client reads response
|
// Client reads response
|
||||||
assertEquals(1,client.getInputStream().read());
|
assertEquals(1, client.getInputStream().read());
|
||||||
|
|
||||||
// Client reads -1
|
// Client reads -1
|
||||||
assertEquals(-1,client.getInputStream().read());
|
assertEquals(-1, client.getInputStream().read());
|
||||||
assertFalse(client.isInputShutdown());
|
assertFalse(client.isInputShutdown());
|
||||||
|
|
||||||
// Client can still write as we are half closed
|
// Client can still write as we are half closed
|
||||||
client.getOutputStream().write(1);
|
client.getOutputStream().write(1);
|
||||||
|
|
||||||
// Server can still read
|
// Server can still read
|
||||||
assertEquals(1,server.getInputStream().read());
|
assertEquals(1, server.getInputStream().read());
|
||||||
|
|
||||||
// Server now closes
|
// Server now closes
|
||||||
server.close();
|
server.close();
|
||||||
|
|
||||||
// Client still reads -1 (not broken pipe !!)
|
// Client still reads -1 (not broken pipe !!)
|
||||||
assertEquals(-1,client.getInputStream().read());
|
assertEquals(-1, client.getInputStream().read());
|
||||||
assertFalse(client.isInputShutdown());
|
assertFalse(client.isInputShutdown());
|
||||||
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
||||||
// Client still reads -1 (not broken pipe !!)
|
// Client still reads -1 (not broken pipe !!)
|
||||||
assertEquals(-1,client.getInputStream().read());
|
assertEquals(-1, client.getInputStream().read());
|
||||||
assertFalse(client.isInputShutdown());
|
assertFalse(client.isInputShutdown());
|
||||||
|
|
||||||
// Client can still write data even though server is closed???
|
// Client can still write data even though server is closed???
|
||||||
client.getOutputStream().write(1);
|
client.getOutputStream().write(1);
|
||||||
|
|
||||||
// Client eventually sees Broken Pipe
|
// Client eventually sees Broken Pipe
|
||||||
int i=0;
|
int i = 0;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
for (i=0;i<100000;i++)
|
for (i = 0; i < 100000; i++)
|
||||||
client.getOutputStream().write(1);
|
client.getOutputStream().write(1);
|
||||||
|
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
|
@ -289,62 +307,64 @@ public class IOTest
|
||||||
ServerSocket connector;
|
ServerSocket connector;
|
||||||
Socket client;
|
Socket client;
|
||||||
Socket server;
|
Socket server;
|
||||||
|
|
||||||
connector = new ServerSocket(0);
|
connector = new ServerSocket(0);
|
||||||
client = new Socket("127.0.0.1",connector.getLocalPort());
|
client = new Socket("127.0.0.1", connector.getLocalPort());
|
||||||
server = connector.accept();
|
server = connector.accept();
|
||||||
client.setTcpNoDelay(true);
|
client.setTcpNoDelay(true);
|
||||||
client.setSoLinger(true,0);
|
client.setSoLinger(true, 0);
|
||||||
server.setTcpNoDelay(true);
|
server.setTcpNoDelay(true);
|
||||||
server.setSoLinger(true,0);
|
server.setSoLinger(true, 0);
|
||||||
|
|
||||||
client.getOutputStream().write(1);
|
client.getOutputStream().write(1);
|
||||||
assertEquals(1,server.getInputStream().read());
|
assertEquals(1, server.getInputStream().read());
|
||||||
server.getOutputStream().write(1);
|
server.getOutputStream().write(1);
|
||||||
assertEquals(1,client.getInputStream().read());
|
assertEquals(1, client.getInputStream().read());
|
||||||
|
|
||||||
// Server generator shutdowns output after non persistent sending response.
|
// Server generator shutdowns output after non persistent sending response.
|
||||||
server.shutdownOutput();
|
server.shutdownOutput();
|
||||||
|
|
||||||
// client endpoint reads EOF and shutdown input as result
|
// client endpoint reads EOF and shutdown input as result
|
||||||
assertEquals(-1,client.getInputStream().read());
|
assertEquals(-1, client.getInputStream().read());
|
||||||
client.shutdownInput();
|
client.shutdownInput();
|
||||||
|
|
||||||
// client connection see's EOF and shutsdown output as no more requests to be sent.
|
// client connection see's EOF and shutsdown output as no more requests to be sent.
|
||||||
client.shutdownOutput();
|
client.shutdownOutput();
|
||||||
|
|
||||||
// Since input already shutdown, client also closes socket.
|
// Since input already shutdown, client also closes socket.
|
||||||
client.close();
|
client.close();
|
||||||
|
|
||||||
// Server reads the EOF from client oshut and shut's down it's input
|
// Server reads the EOF from client oshut and shut's down it's input
|
||||||
assertEquals(-1,server.getInputStream().read());
|
assertEquals(-1, server.getInputStream().read());
|
||||||
server.shutdownInput();
|
server.shutdownInput();
|
||||||
|
|
||||||
// Since output was already shutdown, server closes
|
// Since output was already shutdown, server closes
|
||||||
server.close();
|
server.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAsyncSocketChannel() throws Exception
|
public void testAsyncSocketChannel() throws Exception
|
||||||
{
|
{
|
||||||
AsynchronousServerSocketChannel connector = AsynchronousServerSocketChannel.open();
|
AsynchronousServerSocketChannel connector = AsynchronousServerSocketChannel.open();
|
||||||
connector.bind(null);
|
connector.bind(null);
|
||||||
Future<AsynchronousSocketChannel> acceptor= connector.accept();
|
Future<AsynchronousSocketChannel> acceptor = connector.accept();
|
||||||
|
|
||||||
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
|
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
|
||||||
client.connect(connector.getLocalAddress());
|
client.connect(connector.getLocalAddress()).get(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
AsynchronousSocketChannel server = acceptor.get();
|
AsynchronousSocketChannel server = acceptor.get(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
ByteBuffer read = ByteBuffer.allocate(1024);
|
ByteBuffer read = ByteBuffer.allocate(1024);
|
||||||
Future<Integer> reading=server.read(read);
|
Future<Integer> reading = server.read(read);
|
||||||
|
|
||||||
ByteBuffer write= BufferUtil.toBuffer("Testing 1 2 3");
|
byte[] data = "Testing 1 2 3".getBytes("UTF-8");
|
||||||
Future<Integer> writing=client.write(write);
|
ByteBuffer write = BufferUtil.toBuffer(data);
|
||||||
|
Future<Integer> writing = client.write(write);
|
||||||
reading.get();
|
|
||||||
writing.get();
|
writing.get(5, TimeUnit.SECONDS);
|
||||||
|
reading.get(5, TimeUnit.SECONDS);
|
||||||
read.flip();
|
read.flip();
|
||||||
|
|
||||||
|
Assert.assertEquals(ByteBuffer.wrap(data), read);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,20 +34,17 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<LocalEndPoint>();
|
private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<LocalEndPoint>();
|
||||||
private LocalExecutor _executor;
|
private LocalExecutor _executor;
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
public LocalHttpConnector()
|
public LocalHttpConnector()
|
||||||
{
|
{
|
||||||
setIdleTimeout(30000);
|
setIdleTimeout(30000);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
@Override
|
@Override
|
||||||
public Object getTransport()
|
public Object getTransport()
|
||||||
{
|
{
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
/** Sends requests and get's responses based on thread activity.
|
/** Sends requests and get's responses based on thread activity.
|
||||||
* Returns all the responses received once the thread activity has
|
* Returns all the responses received once the thread activity has
|
||||||
* returned to the level it was before the requests.
|
* returned to the level it was before the requests.
|
||||||
|
@ -61,7 +58,6 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
return result==null?null:BufferUtil.toString(result,StringUtil.__UTF8_CHARSET);
|
return result==null?null:BufferUtil.toString(result,StringUtil.__UTF8_CHARSET);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
/** Sends requests and get's responses based on thread activity.
|
/** Sends requests and get's responses based on thread activity.
|
||||||
* Returns all the responses received once the thread activity has
|
* Returns all the responses received once the thread activity has
|
||||||
* returned to the level it was before the requests.
|
* returned to the level it was before the requests.
|
||||||
|
@ -78,7 +74,6 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
return request.takeOutput();
|
return request.takeOutput();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
/**
|
/**
|
||||||
* Execute a request and return the EndPoint through which
|
* Execute a request and return the EndPoint through which
|
||||||
* responses can be received.
|
* responses can be received.
|
||||||
|
@ -88,14 +83,13 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
public LocalEndPoint executeRequest(String rawRequest)
|
public LocalEndPoint executeRequest(String rawRequest)
|
||||||
{
|
{
|
||||||
Phaser phaser=_executor._phaser;
|
Phaser phaser=_executor._phaser;
|
||||||
int phase = phaser.register(); // the corresponding arrival will be done by the acceptor thread when it takes
|
phaser.register(); // the corresponding arrival will be done by the acceptor thread when it takes
|
||||||
LocalEndPoint endp = new LocalEndPoint();
|
LocalEndPoint endp = new LocalEndPoint();
|
||||||
endp.setInput(BufferUtil.toBuffer(rawRequest,StringUtil.__UTF8_CHARSET));
|
endp.setInput(BufferUtil.toBuffer(rawRequest,StringUtil.__UTF8_CHARSET));
|
||||||
_connects.add(endp);
|
_connects.add(endp);
|
||||||
return endp;
|
return endp;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
@Override
|
@Override
|
||||||
protected void accept(int acceptorID) throws IOException, InterruptedException
|
protected void accept(int acceptorID) throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
|
@ -104,11 +98,11 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
HttpConnection connection=new HttpConnection(this,endp,getServer());
|
HttpConnection connection=new HttpConnection(this,endp,getServer());
|
||||||
endp.setAsyncConnection(connection);
|
endp.setAsyncConnection(connection);
|
||||||
endp.onOpen();
|
endp.onOpen();
|
||||||
|
connection.onOpen();
|
||||||
connectionOpened(connection);
|
connectionOpened(connection);
|
||||||
_executor._phaser.arriveAndDeregister(); // arrive for the register done in getResponses
|
_executor._phaser.arriveAndDeregister(); // arrive for the register done in getResponses
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStart() throws Exception
|
protected void doStart() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -116,7 +110,6 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
_executor=new LocalExecutor(findExecutor());
|
_executor=new LocalExecutor(findExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStop() throws Exception
|
protected void doStop() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -124,28 +117,25 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
_executor=null;
|
_executor=null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
@Override
|
@Override
|
||||||
public Executor findExecutor()
|
public Executor findExecutor()
|
||||||
{
|
{
|
||||||
return _executor==null?super.findExecutor():_executor;
|
return _executor==null?super.findExecutor():_executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
private class LocalExecutor implements Executor
|
||||||
class LocalExecutor implements Executor
|
|
||||||
{
|
{
|
||||||
Phaser _phaser=new Phaser()
|
private final Phaser _phaser=new Phaser()
|
||||||
{
|
{
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean onAdvance(int phase, int registeredParties)
|
protected boolean onAdvance(int phase, int registeredParties)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
final Executor _executor;
|
private final Executor _executor;
|
||||||
LocalExecutor(Executor e)
|
|
||||||
|
private LocalExecutor(Executor e)
|
||||||
{
|
{
|
||||||
_executor=e;
|
_executor=e;
|
||||||
}
|
}
|
||||||
|
@ -173,7 +163,6 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
public class LocalEndPoint extends AsyncByteArrayEndPoint
|
public class LocalEndPoint extends AsyncByteArrayEndPoint
|
||||||
{
|
{
|
||||||
private CountDownLatch _closed = new CountDownLatch(1);
|
private CountDownLatch _closed = new CountDownLatch(1);
|
||||||
|
@ -184,7 +173,6 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
setIdleTimeout(LocalHttpConnector.this.getIdleTimeout());
|
setIdleTimeout(LocalHttpConnector.this.getIdleTimeout());
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
public void addInput(String s)
|
public void addInput(String s)
|
||||||
{
|
{
|
||||||
// TODO this is a busy wait
|
// TODO this is a busy wait
|
||||||
|
@ -193,7 +181,6 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
setInput(BufferUtil.toBuffer(s, StringUtil.__UTF8_CHARSET));
|
setInput(BufferUtil.toBuffer(s, StringUtil.__UTF8_CHARSET));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
@Override
|
@Override
|
||||||
public void close()
|
public void close()
|
||||||
{
|
{
|
||||||
|
@ -206,7 +193,6 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
@Override
|
@Override
|
||||||
public void onClose()
|
public void onClose()
|
||||||
{
|
{
|
||||||
|
@ -214,7 +200,6 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
_closed.countDown();
|
_closed.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
@Override
|
@Override
|
||||||
public void shutdownOutput()
|
public void shutdownOutput()
|
||||||
{
|
{
|
||||||
|
@ -222,7 +207,6 @@ public class LocalHttpConnector extends HttpConnector
|
||||||
close();
|
close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------------------------------------------ */
|
|
||||||
public void waitUntilClosed()
|
public void waitUntilClosed()
|
||||||
{
|
{
|
||||||
while (isOpen())
|
while (isOpen())
|
||||||
|
|
|
@ -13,17 +13,11 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.server.handler;
|
package org.eclipse.jetty.server.handler;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
@ -38,6 +32,11 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class StatisticsHandlerTest
|
public class StatisticsHandlerTest
|
||||||
{
|
{
|
||||||
private Server _server;
|
private Server _server;
|
||||||
|
@ -71,7 +70,7 @@ public class StatisticsHandlerTest
|
||||||
@Test
|
@Test
|
||||||
public void testRequest() throws Exception
|
public void testRequest() throws Exception
|
||||||
{
|
{
|
||||||
final CyclicBarrier barrier[] = { new CyclicBarrier(2), new CyclicBarrier(2)};
|
final CyclicBarrier barrier[] = {new CyclicBarrier(2), new CyclicBarrier(2)};
|
||||||
|
|
||||||
_statsHandler.setHandler(new AbstractHandler()
|
_statsHandler.setHandler(new AbstractHandler()
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue