Issue #2226 - SelectChannelEndPoint test cleanup
Applying recommended changes from review. Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
parent
a437466996
commit
a107bdd2bb
|
@ -1,210 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.net.ssl.SSLEngineResult;
|
||||
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
|
||||
import javax.net.ssl.SSLSocket;
|
||||
|
||||
import org.eclipse.jetty.io.ssl.SslConnection;
|
||||
import org.eclipse.jetty.toolchain.test.JDK;
|
||||
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
||||
import org.eclipse.jetty.toolchain.test.annotation.Stress;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
|
||||
public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
|
||||
{
|
||||
private static SslContextFactory __sslCtxFactory=new SslContextFactory();
|
||||
private static ByteBufferPool __byteBufferPool = new MappedByteBufferPool();
|
||||
|
||||
@BeforeClass
|
||||
public static void initSslEngine() throws Exception
|
||||
{
|
||||
File keystore = MavenTestingUtils.getTestResourceFile("keystore");
|
||||
__sslCtxFactory.setKeyStorePath(keystore.getAbsolutePath());
|
||||
__sslCtxFactory.setKeyStorePassword("storepwd");
|
||||
__sslCtxFactory.setKeyManagerPassword("keypwd");
|
||||
__sslCtxFactory.setEndpointIdentificationAlgorithm("");
|
||||
__sslCtxFactory.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Socket newClient() throws IOException
|
||||
{
|
||||
SSLSocket socket = __sslCtxFactory.newSslSocket();
|
||||
socket.connect(_connector.socket().getLocalSocketAddress());
|
||||
return socket;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection newConnection(SelectableChannel channel, EndPoint endpoint)
|
||||
{
|
||||
SSLEngine engine = __sslCtxFactory.newSSLEngine();
|
||||
engine.setUseClientMode(false);
|
||||
SslConnection sslConnection = new SslConnection(__byteBufferPool, _threadPool, endpoint, engine);
|
||||
sslConnection.setRenegotiationAllowed(__sslCtxFactory.isRenegotiationAllowed());
|
||||
sslConnection.setRenegotiationLimit(__sslCtxFactory.getRenegotiationLimit());
|
||||
Connection appConnection = super.newConnection(channel,sslConnection.getDecryptedEndPoint());
|
||||
sslConnection.getDecryptedEndPoint().setConnection(appConnection);
|
||||
return sslConnection;
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testEcho() throws Exception
|
||||
{
|
||||
super.testEcho();
|
||||
}
|
||||
|
||||
@Ignore // SSL does not do half closes
|
||||
@Override
|
||||
public void testShutdown() throws Exception
|
||||
{
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testWriteBlocked() throws Exception
|
||||
{
|
||||
super.testWriteBlocked();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testReadBlocked() throws Exception
|
||||
{
|
||||
super.testReadBlocked();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testIdle() throws Exception
|
||||
{
|
||||
super.testIdle();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
@Stress("Requires a relatively idle (network wise) environment")
|
||||
public void testStress() throws Exception
|
||||
{
|
||||
super.testStress();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkSslEngineBehaviour() throws Exception
|
||||
{
|
||||
Assume.assumeFalse(JDK.IS_9);
|
||||
|
||||
SSLEngine server = __sslCtxFactory.newSSLEngine();
|
||||
SSLEngine client = __sslCtxFactory.newSSLEngine();
|
||||
|
||||
ByteBuffer netC2S = ByteBuffer.allocate(server.getSession().getPacketBufferSize());
|
||||
ByteBuffer netS2C = ByteBuffer.allocate(server.getSession().getPacketBufferSize());
|
||||
ByteBuffer serverIn = ByteBuffer.allocate(server.getSession().getApplicationBufferSize());
|
||||
ByteBuffer serverOut = ByteBuffer.allocate(server.getSession().getApplicationBufferSize());
|
||||
ByteBuffer clientIn = ByteBuffer.allocate(client.getSession().getApplicationBufferSize());
|
||||
|
||||
SSLEngineResult result;
|
||||
|
||||
// start the client
|
||||
client.setUseClientMode(true);
|
||||
client.beginHandshake();
|
||||
Assert.assertEquals(HandshakeStatus.NEED_WRAP,client.getHandshakeStatus());
|
||||
|
||||
// what if we try an unwrap?
|
||||
netS2C.flip();
|
||||
result=client.unwrap(netS2C,clientIn);
|
||||
// unwrap is a noop
|
||||
assertEquals(SSLEngineResult.Status.OK,result.getStatus());
|
||||
assertEquals(0,result.bytesConsumed());
|
||||
assertEquals(0,result.bytesProduced());
|
||||
assertEquals(HandshakeStatus.NEED_WRAP,result.getHandshakeStatus());
|
||||
netS2C.clear();
|
||||
|
||||
// do the needed WRAP of empty buffer
|
||||
result=client.wrap(BufferUtil.EMPTY_BUFFER,netC2S);
|
||||
// unwrap is a noop
|
||||
assertEquals(SSLEngineResult.Status.OK,result.getStatus());
|
||||
assertEquals(0,result.bytesConsumed());
|
||||
assertThat(result.bytesProduced(),greaterThan(0));
|
||||
assertEquals(HandshakeStatus.NEED_UNWRAP,result.getHandshakeStatus());
|
||||
netC2S.flip();
|
||||
assertEquals(netC2S.remaining(),result.bytesProduced());
|
||||
|
||||
// start the server
|
||||
server.setUseClientMode(false);
|
||||
server.beginHandshake();
|
||||
Assert.assertEquals(HandshakeStatus.NEED_UNWRAP,server.getHandshakeStatus());
|
||||
|
||||
// what if we try a needless wrap?
|
||||
serverOut.put(BufferUtil.toBuffer("Hello World"));
|
||||
serverOut.flip();
|
||||
result=server.wrap(serverOut,netS2C);
|
||||
// wrap is a noop
|
||||
assertEquals(SSLEngineResult.Status.OK,result.getStatus());
|
||||
assertEquals(0,result.bytesConsumed());
|
||||
assertEquals(0,result.bytesProduced());
|
||||
assertEquals(HandshakeStatus.NEED_UNWRAP,result.getHandshakeStatus());
|
||||
|
||||
// Do the needed unwrap, to an empty buffer
|
||||
result=server.unwrap(netC2S,BufferUtil.EMPTY_BUFFER);
|
||||
assertEquals(SSLEngineResult.Status.BUFFER_OVERFLOW,result.getStatus());
|
||||
assertEquals(0,result.bytesConsumed());
|
||||
assertEquals(0,result.bytesProduced());
|
||||
assertEquals(HandshakeStatus.NEED_UNWRAP,result.getHandshakeStatus());
|
||||
|
||||
// Do the needed unwrap, to a full buffer
|
||||
serverIn.position(serverIn.limit());
|
||||
result=server.unwrap(netC2S,serverIn);
|
||||
assertEquals(SSLEngineResult.Status.BUFFER_OVERFLOW,result.getStatus());
|
||||
assertEquals(0,result.bytesConsumed());
|
||||
assertEquals(0,result.bytesProduced());
|
||||
assertEquals(HandshakeStatus.NEED_UNWRAP,result.getHandshakeStatus());
|
||||
|
||||
// Do the needed unwrap, to an empty buffer
|
||||
serverIn.clear();
|
||||
result=server.unwrap(netC2S,serverIn);
|
||||
assertEquals(SSLEngineResult.Status.OK,result.getStatus());
|
||||
assertThat(result.bytesConsumed(),greaterThan(0));
|
||||
assertEquals(0,result.bytesProduced());
|
||||
assertEquals(HandshakeStatus.NEED_TASK,result.getHandshakeStatus());
|
||||
|
||||
server.getDelegatedTask().run();
|
||||
|
||||
assertEquals(HandshakeStatus.NEED_WRAP,server.getHandshakeStatus());
|
||||
}
|
||||
}
|
|
@ -1,837 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.TestTracker;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.eclipse.jetty.util.thread.TimerScheduler;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SelectChannelEndPointTest
|
||||
{
|
||||
@Rule
|
||||
public TestTracker tracker = new TestTracker();
|
||||
|
||||
private static final Logger LOG = Log.getLogger(SelectChannelEndPointTest.class);
|
||||
protected CountDownLatch _lastEndPointLatch;
|
||||
protected volatile EndPoint _lastEndPoint;
|
||||
protected ServerSocketChannel _connector;
|
||||
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
|
||||
protected Scheduler _scheduler = new TimerScheduler();
|
||||
protected SelectorManager _manager = new SelectorManager(_threadPool, _scheduler)
|
||||
{
|
||||
@Override
|
||||
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
|
||||
{
|
||||
return SelectChannelEndPointTest.this.newConnection(channel, endpoint);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException
|
||||
{
|
||||
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
|
||||
endp.setIdleTimeout(60000);
|
||||
_lastEndPoint = endp;
|
||||
_lastEndPointLatch.countDown();
|
||||
return endp;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
// Must be volatile or the test may fail spuriously
|
||||
protected volatile int _blockAt = 0;
|
||||
private volatile int _writeCount = 1;
|
||||
|
||||
@Before
|
||||
public void startManager() throws Exception
|
||||
{
|
||||
System.gc();
|
||||
_writeCount = 1;
|
||||
_lastEndPoint = null;
|
||||
_lastEndPointLatch = new CountDownLatch(1);
|
||||
_connector = ServerSocketChannel.open();
|
||||
_connector.socket().bind(null);
|
||||
_scheduler.start();
|
||||
_threadPool.start();
|
||||
_manager.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopManager() throws Exception
|
||||
{
|
||||
_scheduler.stop();
|
||||
_manager.stop();
|
||||
_threadPool.stop();
|
||||
_connector.close();
|
||||
}
|
||||
|
||||
protected Socket newClient() throws IOException
|
||||
{
|
||||
return new Socket(_connector.socket().getInetAddress(), _connector.socket().getLocalPort());
|
||||
}
|
||||
|
||||
protected Connection newConnection(SelectableChannel channel, EndPoint endpoint)
|
||||
{
|
||||
return new TestConnection(endpoint);
|
||||
}
|
||||
|
||||
public class TestConnection extends AbstractConnection
|
||||
{
|
||||
volatile FutureCallback _blockingRead;
|
||||
ByteBuffer _in = BufferUtil.allocate(32 * 1024);
|
||||
ByteBuffer _out = BufferUtil.allocate(32 * 1024);
|
||||
long _last = -1;
|
||||
final CountDownLatch _latch;
|
||||
|
||||
public TestConnection(EndPoint endp)
|
||||
{
|
||||
super(endp, _threadPool);
|
||||
_latch=null;
|
||||
}
|
||||
|
||||
public TestConnection(EndPoint endp,CountDownLatch latch)
|
||||
{
|
||||
super(endp, _threadPool);
|
||||
_latch=latch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
super.onOpen();
|
||||
fillInterested();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFillInterestedFailed(Throwable cause)
|
||||
{
|
||||
Callback blocking = _blockingRead;
|
||||
if (blocking!=null)
|
||||
{
|
||||
_blockingRead=null;
|
||||
blocking.failed(cause);
|
||||
return;
|
||||
}
|
||||
super.onFillInterestedFailed(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFillable()
|
||||
{
|
||||
if (_latch!=null)
|
||||
{
|
||||
try
|
||||
{
|
||||
_latch.await();
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
Callback blocking = _blockingRead;
|
||||
if (blocking!=null)
|
||||
{
|
||||
_blockingRead=null;
|
||||
blocking.succeeded();
|
||||
return;
|
||||
}
|
||||
|
||||
EndPoint _endp = getEndPoint();
|
||||
try
|
||||
{
|
||||
_last = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
boolean progress = true;
|
||||
while (progress)
|
||||
{
|
||||
progress = false;
|
||||
|
||||
// Fill the input buffer with everything available
|
||||
BufferUtil.compact(_in);
|
||||
if (BufferUtil.isFull(_in))
|
||||
throw new IllegalStateException("FULL " + BufferUtil.toDetailString(_in));
|
||||
int filled = _endp.fill(_in);
|
||||
if (filled > 0)
|
||||
progress = true;
|
||||
|
||||
// If the tests wants to block, then block
|
||||
while (_blockAt > 0 && _endp.isOpen() && _in.remaining() < _blockAt)
|
||||
{
|
||||
FutureCallback future = _blockingRead = new FutureCallback();
|
||||
fillInterested();
|
||||
future.get();
|
||||
filled = _endp.fill(_in);
|
||||
progress |= filled > 0;
|
||||
}
|
||||
|
||||
// Copy to the out buffer
|
||||
if (BufferUtil.hasContent(_in) && BufferUtil.append(_out, _in) > 0)
|
||||
progress = true;
|
||||
|
||||
// Blocking writes
|
||||
if (BufferUtil.hasContent(_out))
|
||||
{
|
||||
ByteBuffer out = _out.duplicate();
|
||||
BufferUtil.clear(_out);
|
||||
for (int i = 0; i < _writeCount; i++)
|
||||
{
|
||||
FutureCallback blockingWrite = new FutureCallback();
|
||||
_endp.write(blockingWrite, out.asReadOnlyBuffer());
|
||||
blockingWrite.get();
|
||||
}
|
||||
progress = true;
|
||||
}
|
||||
|
||||
// are we done?
|
||||
if (_endp.isInputShutdown())
|
||||
_endp.shutdownOutput();
|
||||
}
|
||||
|
||||
if (_endp.isOpen())
|
||||
fillInterested();
|
||||
}
|
||||
catch (ExecutionException e)
|
||||
{
|
||||
// Timeout does not close, so echo exception then shutdown
|
||||
try
|
||||
{
|
||||
FutureCallback blockingWrite = new FutureCallback();
|
||||
_endp.write(blockingWrite, BufferUtil.toBuffer("EE: " + BufferUtil.toString(_in)));
|
||||
blockingWrite.get();
|
||||
_endp.shutdownOutput();
|
||||
}
|
||||
catch (Exception e2)
|
||||
{
|
||||
// e2.printStackTrace();
|
||||
}
|
||||
}
|
||||
catch (InterruptedException | EofException e)
|
||||
{
|
||||
Log.getRootLogger().ignore(e);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.getRootLogger().warn(e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEcho() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
|
||||
client.setSoTimeout(60000);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
|
||||
// Write client to server
|
||||
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
|
||||
// wait for read timeout
|
||||
client.setSoTimeout(500);
|
||||
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
try
|
||||
{
|
||||
client.getInputStream().read();
|
||||
Assert.fail();
|
||||
}
|
||||
catch (SocketTimeoutException e)
|
||||
{
|
||||
long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start;
|
||||
Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L));
|
||||
}
|
||||
|
||||
// write then shutdown
|
||||
client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "Goodbye Cruel TLS".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
Assert.assertThat("expect valid char integer", b, greaterThan(0));
|
||||
assertEquals("expect characters to be same", c, (char)b);
|
||||
}
|
||||
client.close();
|
||||
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
if (server.isOpen())
|
||||
Thread.sleep(10);
|
||||
else
|
||||
break;
|
||||
}
|
||||
assertFalse(server.isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShutdown() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
|
||||
client.setSoTimeout(500);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
|
||||
// Write client to server
|
||||
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
|
||||
// wait for read timeout
|
||||
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
try
|
||||
{
|
||||
client.getInputStream().read();
|
||||
Assert.fail();
|
||||
}
|
||||
catch (SocketTimeoutException e)
|
||||
{
|
||||
assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start >= 400);
|
||||
}
|
||||
|
||||
// write then shutdown
|
||||
client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8));
|
||||
client.shutdownOutput();
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "Goodbye Cruel TLS".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
|
||||
// Read close
|
||||
assertEquals(-1, client.getInputStream().read());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadBlocked() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
|
||||
OutputStream clientOutputStream = client.getOutputStream();
|
||||
InputStream clientInputStream = client.getInputStream();
|
||||
|
||||
int specifiedTimeout = 1000;
|
||||
client.setSoTimeout(specifiedTimeout);
|
||||
|
||||
// Write 8 and cause block waiting for 10
|
||||
_blockAt = 10;
|
||||
clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8));
|
||||
clientOutputStream.flush();
|
||||
|
||||
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
|
||||
_lastEndPoint.setIdleTimeout(10 * specifiedTimeout);
|
||||
Thread.sleep((11 * specifiedTimeout) / 10);
|
||||
|
||||
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
try
|
||||
{
|
||||
int b = clientInputStream.read();
|
||||
Assert.fail("Should have timed out waiting for a response, but read " + b);
|
||||
}
|
||||
catch (SocketTimeoutException e)
|
||||
{
|
||||
int elapsed = Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start).intValue();
|
||||
Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3 * specifiedTimeout / 4));
|
||||
}
|
||||
|
||||
// write remaining characters
|
||||
clientOutputStream.write("90ABCDEF".getBytes(StandardCharsets.UTF_8));
|
||||
clientOutputStream.flush();
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "1234567890ABCDEF".toCharArray())
|
||||
{
|
||||
int b = clientInputStream.read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdle() throws Exception
|
||||
{
|
||||
int idleTimeout = 2000;
|
||||
|
||||
Socket client = newClient();
|
||||
|
||||
client.setSoTimeout(idleTimeout*10);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
Assert.assertTrue(_lastEndPointLatch.await(10, TimeUnit.SECONDS));
|
||||
_lastEndPoint.setIdleTimeout(idleTimeout);
|
||||
|
||||
// Write client to server
|
||||
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
|
||||
// read until idle shutdown received
|
||||
int b = client.getInputStream().read();
|
||||
assertEquals(-1, b);
|
||||
long idle = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start;
|
||||
assertThat(idle, greaterThan(idleTimeout - 100L));
|
||||
assertThat(idle, lessThan(idleTimeout * 2L));
|
||||
|
||||
// But endpoint may still be open for a little bit.
|
||||
for (int i = 0; i < 20; ++i)
|
||||
{
|
||||
if (_lastEndPoint.isOpen())
|
||||
Thread.sleep(2 * idleTimeout / 10);
|
||||
else
|
||||
break;
|
||||
}
|
||||
assertFalse(_lastEndPoint.isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockedReadIdle() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
InputStream clientInputStream = client.getInputStream();
|
||||
OutputStream clientOutputStream = client.getOutputStream();
|
||||
|
||||
client.setSoTimeout(5000);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
|
||||
// Write client to server
|
||||
clientOutputStream.write("HelloWorld".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = clientInputStream.read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
|
||||
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
|
||||
int idleTimeout = 500;
|
||||
_lastEndPoint.setIdleTimeout(idleTimeout);
|
||||
|
||||
// Write 8 and cause block waiting for 10
|
||||
_blockAt = 10;
|
||||
clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8));
|
||||
clientOutputStream.flush();
|
||||
|
||||
// read until idle shutdown received
|
||||
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
int b = clientInputStream.read();
|
||||
assertEquals('E', b);
|
||||
long idle = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start;
|
||||
assertTrue(idle > idleTimeout / 2);
|
||||
assertTrue(idle < idleTimeout * 2);
|
||||
|
||||
for (char c : "E: 12345678".toCharArray())
|
||||
{
|
||||
b = clientInputStream.read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
b = clientInputStream.read();
|
||||
assertEquals(-1,b);
|
||||
|
||||
// But endpoint is still open.
|
||||
if(_lastEndPoint.isOpen())
|
||||
// Wait for another idle callback
|
||||
Thread.sleep(idleTimeout * 2);
|
||||
|
||||
// endpoint is closed.
|
||||
assertFalse(_lastEndPoint.isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStress() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
client.setSoTimeout(30000);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
final int writes = 200000;
|
||||
|
||||
final byte[] bytes = "HelloWorld-".getBytes(StandardCharsets.UTF_8);
|
||||
byte[] count = "0\n".getBytes(StandardCharsets.UTF_8);
|
||||
BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream());
|
||||
final CountDownLatch latch = new CountDownLatch(writes);
|
||||
final InputStream in = new BufferedInputStream(client.getInputStream());
|
||||
final long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
out.write(bytes);
|
||||
out.write(count);
|
||||
out.flush();
|
||||
|
||||
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
|
||||
_lastEndPoint.setIdleTimeout(5000);
|
||||
|
||||
new Thread()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
Thread.currentThread().setPriority(MAX_PRIORITY);
|
||||
long last = -1;
|
||||
int count = -1;
|
||||
try
|
||||
{
|
||||
while (latch.getCount() > 0)
|
||||
{
|
||||
// Verify echo server to client
|
||||
for (byte b0 : bytes)
|
||||
{
|
||||
int b = in.read();
|
||||
Assert.assertThat(b, greaterThan(0));
|
||||
assertEquals(0xff & b0, b);
|
||||
}
|
||||
|
||||
count = 0;
|
||||
int b = in.read();
|
||||
while (b > 0 && b != '\n')
|
||||
{
|
||||
count = count * 10 + (b - '0');
|
||||
b = in.read();
|
||||
}
|
||||
last = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
|
||||
//if (latch.getCount()%1000==0)
|
||||
// System.out.println(writes-latch.getCount());
|
||||
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
|
||||
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
System.err.println("count=" + count);
|
||||
System.err.println("latch=" + latch.getCount());
|
||||
System.err.println("time=" + (now - start));
|
||||
System.err.println("last=" + (now - last));
|
||||
System.err.println("endp=" + _lastEndPoint);
|
||||
System.err.println("conn=" + _lastEndPoint.getConnection());
|
||||
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
|
||||
// Write client to server
|
||||
for (int i = 1; i < writes; i++)
|
||||
{
|
||||
out.write(bytes);
|
||||
out.write(Integer.toString(i).getBytes(StandardCharsets.ISO_8859_1));
|
||||
out.write('\n');
|
||||
if (i % 1000 == 0)
|
||||
{
|
||||
//System.err.println(i+"/"+writes);
|
||||
out.flush();
|
||||
}
|
||||
Thread.yield();
|
||||
}
|
||||
out.flush();
|
||||
|
||||
long last = latch.getCount();
|
||||
while (!latch.await(5, TimeUnit.SECONDS))
|
||||
{
|
||||
//System.err.println(latch.getCount());
|
||||
if (latch.getCount() == last)
|
||||
Assert.fail();
|
||||
last = latch.getCount();
|
||||
}
|
||||
|
||||
assertEquals(0, latch.getCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteBlocked() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
|
||||
client.setSoTimeout(10000);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
|
||||
// Write client to server
|
||||
_writeCount = 10000;
|
||||
String data = "Now is the time for all good men to come to the aid of the party";
|
||||
client.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));
|
||||
BufferedInputStream in = new BufferedInputStream(client.getInputStream());
|
||||
|
||||
int byteNum = 0;
|
||||
try
|
||||
{
|
||||
for (int i = 0; i < _writeCount; i++)
|
||||
{
|
||||
if (i % 1000 == 0)
|
||||
TimeUnit.MILLISECONDS.sleep(200);
|
||||
|
||||
// Verify echo server to client
|
||||
for (int j = 0; j < data.length(); j++)
|
||||
{
|
||||
char c = data.charAt(j);
|
||||
int b = in.read();
|
||||
byteNum++;
|
||||
assertTrue(b > 0);
|
||||
assertEquals("test-" + i + "/" + j,c,(char)b);
|
||||
}
|
||||
|
||||
if (i == 0)
|
||||
_lastEndPoint.setIdleTimeout(60000);
|
||||
}
|
||||
}
|
||||
catch (SocketTimeoutException e)
|
||||
{
|
||||
System.err.println("SelectorManager.dump() = " + _manager.dump());
|
||||
LOG.warn("Server: " + server);
|
||||
LOG.warn("Error reading byte #" + byteNum,e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
client.close();
|
||||
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
if (server.isOpen())
|
||||
Thread.sleep(10);
|
||||
else
|
||||
break;
|
||||
}
|
||||
assertFalse(server.isOpen());
|
||||
}
|
||||
|
||||
|
||||
// TODO make this test reliable
|
||||
@Test
|
||||
@Ignore
|
||||
public void testRejectedExecution() throws Exception
|
||||
{
|
||||
_manager.stop();
|
||||
_threadPool.stop();
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(4);
|
||||
_threadPool = new QueuedThreadPool(4,4,60000,q);
|
||||
_manager = new SelectorManager(_threadPool, _scheduler, 1)
|
||||
{
|
||||
|
||||
@Override
|
||||
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
|
||||
{
|
||||
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel,selector,selectionKey,getScheduler());
|
||||
_lastEndPoint = endp;
|
||||
_lastEndPointLatch.countDown();
|
||||
return endp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
|
||||
{
|
||||
return new TestConnection(endpoint,latch);
|
||||
}
|
||||
};
|
||||
|
||||
_threadPool.start();
|
||||
_manager.start();
|
||||
|
||||
AtomicInteger timeout = new AtomicInteger();
|
||||
AtomicInteger rejections = new AtomicInteger();
|
||||
AtomicInteger echoed = new AtomicInteger();
|
||||
|
||||
CountDownLatch closed = new CountDownLatch(20);
|
||||
for (int i=0;i<20;i++)
|
||||
{
|
||||
new Thread()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try(Socket client = newClient();)
|
||||
{
|
||||
client.setSoTimeout(5000);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
|
||||
// Write client to server
|
||||
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
|
||||
client.getOutputStream().flush();
|
||||
client.shutdownOutput();
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
assertEquals(-1,client.getInputStream().read());
|
||||
echoed.incrementAndGet();
|
||||
}
|
||||
catch(SocketTimeoutException x)
|
||||
{
|
||||
x.printStackTrace();
|
||||
timeout.incrementAndGet();
|
||||
}
|
||||
catch(Throwable x)
|
||||
{
|
||||
rejections.incrementAndGet();
|
||||
}
|
||||
finally
|
||||
{
|
||||
closed.countDown();
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
|
||||
// unblock the handling
|
||||
latch.countDown();
|
||||
|
||||
// wait for all clients to complete or fail
|
||||
closed.await();
|
||||
|
||||
// assert some clients must have been rejected
|
||||
Assert.assertThat(rejections.get(),Matchers.greaterThan(0));
|
||||
// but not all of them
|
||||
Assert.assertThat(rejections.get(),Matchers.lessThan(20));
|
||||
// none should have timed out
|
||||
Assert.assertThat(timeout.get(),Matchers.equalTo(0));
|
||||
// and the rest should have worked
|
||||
Assert.assertThat(echoed.get(),Matchers.equalTo(20-rejections.get()));
|
||||
|
||||
// and the selector is still working for new requests
|
||||
try(Socket client = newClient();)
|
||||
{
|
||||
client.setSoTimeout(5000);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
|
||||
// Write client to server
|
||||
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
|
||||
client.getOutputStream().flush();
|
||||
client.shutdownOutput();
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
assertEquals(-1,client.getInputStream().read());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -20,10 +20,8 @@ package org.eclipse.jetty.io;
|
|||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
|
@ -56,6 +54,7 @@ import javax.net.ssl.SSLSocket;
|
|||
|
||||
import org.eclipse.jetty.io.ssl.SslConnection;
|
||||
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
|
||||
import org.eclipse.jetty.toolchain.test.TestTracker;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
|
@ -69,6 +68,7 @@ import org.hamcrest.Matchers;
|
|||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -83,7 +83,7 @@ public class SocketChannelEndPointTest
|
|||
{
|
||||
Socket newClient(ServerSocketChannel connector) throws IOException;
|
||||
|
||||
Connection newConnection(SelectableChannel channel, EndPoint endPoint, Executor executor, SafeInteger blockAt, SafeInteger writeCount);
|
||||
Connection newConnection(SelectableChannel channel, EndPoint endPoint, Executor executor, AtomicInteger blockAt, AtomicInteger writeCount);
|
||||
|
||||
boolean supportsHalfCloses();
|
||||
}
|
||||
|
@ -100,7 +100,10 @@ public class SocketChannelEndPointTest
|
|||
return ret;
|
||||
}
|
||||
|
||||
public Scenario _scenario;
|
||||
@Rule
|
||||
public TestTracker tracker = new TestTracker();
|
||||
|
||||
private final Scenario _scenario;
|
||||
|
||||
private ServerSocketChannel _connector;
|
||||
private QueuedThreadPool _threadPool;
|
||||
|
@ -110,8 +113,8 @@ public class SocketChannelEndPointTest
|
|||
private CountDownLatch _lastEndPointLatch;
|
||||
|
||||
// Must be volatile or the test may fail spuriously
|
||||
private SafeInteger _blockAt = new SafeInteger("_blockAt", 0);
|
||||
private SafeInteger _writeCount = new SafeInteger("_writeCount", 1);
|
||||
private AtomicInteger _blockAt = new AtomicInteger(0);
|
||||
private AtomicInteger _writeCount = new AtomicInteger(1);
|
||||
|
||||
public SocketChannelEndPointTest(Scenario scenario) throws Exception
|
||||
{
|
||||
|
@ -299,112 +302,6 @@ public class SocketChannelEndPointTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdle() throws Exception
|
||||
{
|
||||
int idleTimeout = 2000;
|
||||
|
||||
Socket client = _scenario.newClient(_connector);
|
||||
|
||||
client.setSoTimeout(idleTimeout * 10);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
Assert.assertTrue(_lastEndPointLatch.await(10, TimeUnit.SECONDS));
|
||||
_lastEndPoint.setIdleTimeout(idleTimeout);
|
||||
|
||||
// Write client to server
|
||||
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char) b);
|
||||
}
|
||||
|
||||
// read until idle shutdown received
|
||||
int b = client.getInputStream().read();
|
||||
assertEquals(-1, b);
|
||||
long idle = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start;
|
||||
assertThat(idle, greaterThan(idleTimeout - 100L));
|
||||
assertThat(idle, lessThan(idleTimeout * 2L));
|
||||
|
||||
// But endpoint may still be open for a little bit.
|
||||
for (int i = 0; i < 20; ++i)
|
||||
{
|
||||
if (_lastEndPoint.isOpen())
|
||||
Thread.sleep(2 * idleTimeout / 10);
|
||||
else
|
||||
break;
|
||||
}
|
||||
assertFalse(_lastEndPoint.isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockedReadIdle() throws Exception
|
||||
{
|
||||
Socket client = _scenario.newClient(_connector);
|
||||
InputStream clientInputStream = client.getInputStream();
|
||||
OutputStream clientOutputStream = client.getOutputStream();
|
||||
|
||||
client.setSoTimeout(5000);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
|
||||
// Write client to server
|
||||
clientOutputStream.write("HelloWorld".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = clientInputStream.read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char) b);
|
||||
}
|
||||
|
||||
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
|
||||
int idleTimeout = 500;
|
||||
_lastEndPoint.setIdleTimeout(idleTimeout);
|
||||
|
||||
// Write 8 and cause block waiting for 10
|
||||
_blockAt.set(10);
|
||||
clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8));
|
||||
clientOutputStream.flush();
|
||||
|
||||
// read until idle shutdown received
|
||||
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
int b = clientInputStream.read();
|
||||
assertEquals('E', b);
|
||||
long idle = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start;
|
||||
assertTrue(idle > idleTimeout / 2);
|
||||
assertTrue(idle < idleTimeout * 2);
|
||||
|
||||
for (char c : "E: 12345678".toCharArray())
|
||||
{
|
||||
b = clientInputStream.read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char) b);
|
||||
}
|
||||
b = clientInputStream.read();
|
||||
assertEquals(-1, b);
|
||||
|
||||
// But endpoint is still open.
|
||||
if (_lastEndPoint.isOpen())
|
||||
// Wait for another idle callback
|
||||
Thread.sleep(idleTimeout * 2);
|
||||
|
||||
// endpoint is closed.
|
||||
assertFalse(_lastEndPoint.isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStress() throws Exception
|
||||
{
|
||||
|
@ -729,7 +626,7 @@ public class SocketChannelEndPointTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Executor executor, SafeInteger blockAt, SafeInteger writeCount)
|
||||
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Executor executor, AtomicInteger blockAt, AtomicInteger writeCount)
|
||||
{
|
||||
return new TestConnection(endpoint, executor, blockAt, writeCount);
|
||||
}
|
||||
|
@ -773,7 +670,7 @@ public class SocketChannelEndPointTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Executor executor, SafeInteger blockAt, SafeInteger writeCount)
|
||||
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Executor executor, AtomicInteger blockAt, AtomicInteger writeCount)
|
||||
{
|
||||
SSLEngine engine = __sslCtxFactory.newSSLEngine();
|
||||
engine.setUseClientMode(false);
|
||||
|
@ -798,62 +695,21 @@ public class SocketChannelEndPointTest
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing possibility of a bad test configuration
|
||||
*/
|
||||
public static class SafeInteger
|
||||
{
|
||||
private final String name;
|
||||
private int value;
|
||||
private boolean getCalled = false;
|
||||
private Throwable firstGetThrowable;
|
||||
|
||||
public SafeInteger(String name, int value)
|
||||
{
|
||||
this.name = name;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public void set(int value)
|
||||
{
|
||||
synchronized(this)
|
||||
{
|
||||
if(getCalled)
|
||||
throw new IllegalStateException(name + ".get() already called, unable to " + name + ".set(" + value + ") now: TOOLATE", firstGetThrowable);
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
public int get()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if(!getCalled)
|
||||
{
|
||||
// first occurrence.
|
||||
firstGetThrowable = new Throwable("First Get Here");
|
||||
}
|
||||
getCalled = true;
|
||||
return this.value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
public static class TestConnection extends AbstractConnection
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(TestConnection.class);
|
||||
|
||||
volatile FutureCallback _blockingRead;
|
||||
final SafeInteger _blockAt;
|
||||
final SafeInteger _writeCount;
|
||||
final AtomicInteger _blockAt;
|
||||
final AtomicInteger _writeCount;
|
||||
// volatile int _blockAt = 0;
|
||||
ByteBuffer _in = BufferUtil.allocate(32 * 1024);
|
||||
ByteBuffer _out = BufferUtil.allocate(32 * 1024);
|
||||
long _last = -1;
|
||||
final CountDownLatch _latch;
|
||||
|
||||
public TestConnection(EndPoint endp, Executor executor, SafeInteger blockAt, SafeInteger writeCount)
|
||||
public TestConnection(EndPoint endp, Executor executor, AtomicInteger blockAt, AtomicInteger writeCount)
|
||||
{
|
||||
super(endp, executor);
|
||||
_latch = null;
|
||||
|
@ -861,7 +717,7 @@ public class SocketChannelEndPointTest
|
|||
this._writeCount = writeCount;
|
||||
}
|
||||
|
||||
public TestConnection(EndPoint endp, CountDownLatch latch, Executor executor, SafeInteger blockAt, SafeInteger writeCount)
|
||||
public TestConnection(EndPoint endp, CountDownLatch latch, Executor executor, AtomicInteger blockAt, AtomicInteger writeCount)
|
||||
{
|
||||
super(endp, executor);
|
||||
_latch = latch;
|
||||
|
|
Loading…
Reference in New Issue