Fixes #2547 - Review usages of ServerSocket[Channel].accept().

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2018-05-16 18:40:10 +02:00
parent 7a7aa0f1b1
commit 744f9054b3
11 changed files with 1347 additions and 1371 deletions

View File

@ -479,39 +479,41 @@ public class HttpClientTLSTest
latch.countDown(); latch.countDown();
}); });
Socket socket = server.accept(); try (Socket socket = server.accept())
SSLSocket sslSocket = (SSLSocket)serverTLSFactory.getSslContext().getSocketFactory().createSocket(socket, null, socket.getPort(), true);
sslSocket.setUseClientMode(false);
BufferedReader reader = new BufferedReader(new InputStreamReader(sslSocket.getInputStream(), StandardCharsets.UTF_8));
while (true)
{ {
String line = reader.readLine(); SSLSocket sslSocket = (SSLSocket)serverTLSFactory.getSslContext().getSocketFactory().createSocket(socket, null, socket.getPort(), true);
if (line == null || line.isEmpty()) sslSocket.setUseClientMode(false);
break; BufferedReader reader = new BufferedReader(new InputStreamReader(sslSocket.getInputStream(), StandardCharsets.UTF_8));
while (true)
{
String line = reader.readLine();
if (line == null || line.isEmpty())
break;
}
// If the response is Content-Length delimited, allowing the
// missing TLS Close Message is fine because the application
// will see a EOFException anyway.
// If the response is connection delimited, allowing the
// missing TLS Close Message is bad because the application
// will see a successful response with truncated content.
// Verify that by not allowing the missing
// TLS Close Message we get a response failure.
byte[] half = new byte[8];
String response = "HTTP/1.1 200 OK\r\n" +
// "Content-Length: " + (half.length * 2) + "\r\n" +
"Connection: close\r\n" +
"\r\n";
OutputStream output = sslSocket.getOutputStream();
output.write(response.getBytes(StandardCharsets.UTF_8));
output.write(half);
output.flush();
// Simulate a truncation attack by raw closing
// the socket in the try-with-resources block end.
} }
// If the response is Content-Length delimited, allowing the
// missing TLS Close Message is fine because the application
// will see a EOFException anyway.
// If the response is connection delimited, allowing the
// missing TLS Close Message is bad because the application
// will see a successful response with truncated content.
// Verify that by not allowing the missing
// TLS Close Message we get a response failure.
byte[] half = new byte[8];
String response = "HTTP/1.1 200 OK\r\n" +
// "Content-Length: " + (half.length * 2) + "\r\n" +
"Connection: close\r\n" +
"\r\n";
OutputStream output = sslSocket.getOutputStream();
output.write(response.getBytes(StandardCharsets.UTF_8));
output.write(half);
output.flush();
// Simulate a truncation attack by raw closing.
socket.close();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
} }
} }

View File

@ -98,63 +98,66 @@ public class ServerConnectionCloseTest
private void testServerSendsConnectionClose(boolean shutdownOutput, boolean chunked, String content) throws Exception private void testServerSendsConnectionClose(boolean shutdownOutput, boolean chunked, String content) throws Exception
{ {
ServerSocket server = new ServerSocket(0); try (ServerSocket server = new ServerSocket(0))
int port = server.getLocalPort();
startClient();
Request request = client.newRequest("localhost", port).path("/ctx/path");
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
Socket socket = server.accept();
InputStream input = socket.getInputStream();
consumeRequest(input);
OutputStream output = socket.getOutputStream();
String serverResponse = "" +
"HTTP/1.1 200 OK\r\n" +
"Connection: close\r\n";
if (chunked)
{ {
serverResponse += "" + int port = server.getLocalPort();
"Transfer-Encoding: chunked\r\n" +
"\r\n"; startClient();
Request request = client.newRequest("localhost", port).path("/ctx/path");
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
try (Socket socket = server.accept())
{
InputStream input = socket.getInputStream();
consumeRequest(input);
OutputStream output = socket.getOutputStream();
String serverResponse = "" +
"HTTP/1.1 200 OK\r\n" +
"Connection: close\r\n";
if (chunked)
{
serverResponse += "" +
"Transfer-Encoding: chunked\r\n" +
"\r\n";
for (int i = 0; i < 2; ++i) for (int i = 0; i < 2; ++i)
{ {
serverResponse += serverResponse +=
Integer.toHexString(content.length()) + "\r\n" + Integer.toHexString(content.length()) + "\r\n" +
content + "\r\n"; content + "\r\n";
} }
serverResponse += "" + serverResponse += "" +
"0\r\n" + "0\r\n" +
"\r\n"; "\r\n";
}
else
{
serverResponse += "Content-Length: " + content.length() + "\r\n";
serverResponse += "\r\n";
serverResponse += content;
}
output.write(serverResponse.getBytes("UTF-8"));
output.flush();
if (shutdownOutput)
socket.shutdownOutput();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Give some time to process the connection.
Thread.sleep(1000);
// Connection should have been removed from pool.
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getIdleConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnectionCount());
}
} }
else
{
serverResponse += "Content-Length: " + content.length() + "\r\n";
serverResponse += "\r\n";
serverResponse += content;
}
output.write(serverResponse.getBytes("UTF-8"));
output.flush();
if (shutdownOutput)
socket.shutdownOutput();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Give some time to process the connection.
Thread.sleep(1000);
// Connection should have been removed from pool.
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getIdleConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnectionCount());
} }
private boolean consumeRequest(InputStream input) throws IOException private boolean consumeRequest(InputStream input) throws IOException

View File

@ -79,41 +79,40 @@ public class Socks4ProxyTest
latch.countDown(); latch.countDown();
}); });
SocketChannel channel = server.accept(); try (SocketChannel channel = server.accept())
{
int socks4MessageLength = 9;
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength);
int read = channel.read(buffer);
Assert.assertEquals(socks4MessageLength, read);
Assert.assertEquals(4, buffer.get(0) & 0xFF);
Assert.assertEquals(1, buffer.get(1) & 0xFF);
Assert.assertEquals(serverPort, buffer.getShort(2) & 0xFFFF);
Assert.assertEquals(ip1, buffer.get(4) & 0xFF);
Assert.assertEquals(ip2, buffer.get(5) & 0xFF);
Assert.assertEquals(ip3, buffer.get(6) & 0xFF);
Assert.assertEquals(ip4, buffer.get(7) & 0xFF);
Assert.assertEquals(0, buffer.get(8) & 0xFF);
int socks4MessageLength = 9; // Socks4 response.
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength); channel.write(ByteBuffer.wrap(new byte[]{0, 0x5A, 0, 0, 0, 0, 0, 0}));
int read = channel.read(buffer);
Assert.assertEquals(socks4MessageLength, read);
Assert.assertEquals(4, buffer.get(0) & 0xFF);
Assert.assertEquals(1, buffer.get(1) & 0xFF);
Assert.assertEquals(serverPort, buffer.getShort(2) & 0xFFFF);
Assert.assertEquals(ip1, buffer.get(4) & 0xFF);
Assert.assertEquals(ip2, buffer.get(5) & 0xFF);
Assert.assertEquals(ip3, buffer.get(6) & 0xFF);
Assert.assertEquals(ip4, buffer.get(7) & 0xFF);
Assert.assertEquals(0, buffer.get(8) & 0xFF);
// Socks4 response. buffer = ByteBuffer.allocate(method.length() + 1 + path.length());
channel.write(ByteBuffer.wrap(new byte[]{0, 0x5A, 0, 0, 0, 0, 0, 0})); read = channel.read(buffer);
Assert.assertEquals(buffer.capacity(), read);
buffer.flip();
Assert.assertEquals(method + " " + path, StandardCharsets.UTF_8.decode(buffer).toString());
buffer = ByteBuffer.allocate(method.length() + 1 + path.length()); // Response
read = channel.read(buffer); String response = "" +
Assert.assertEquals(buffer.capacity(), read); "HTTP/1.1 200 OK\r\n" +
buffer.flip(); "Content-Length: 0\r\n" +
Assert.assertEquals(method + " " + path, StandardCharsets.UTF_8.decode(buffer).toString()); "Connection: close\r\n" +
"\r\n";
channel.write(ByteBuffer.wrap(response.getBytes("UTF-8")));
// Response Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
String response = "" + }
"HTTP/1.1 200 OK\r\n" +
"Content-Length: 0\r\n" +
"Connection: close\r\n" +
"\r\n";
channel.write(ByteBuffer.wrap(response.getBytes("UTF-8")));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
channel.close();
} }
@Test @Test
@ -139,39 +138,38 @@ public class Socks4ProxyTest
result.getFailure().printStackTrace(); result.getFailure().printStackTrace();
}); });
SocketChannel channel = server.accept(); try (SocketChannel channel = server.accept())
{
int socks4MessageLength = 9;
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength);
int read = channel.read(buffer);
Assert.assertEquals(socks4MessageLength, read);
int socks4MessageLength = 9; // Socks4 response, with split bytes.
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength); byte[] chunk1 = new byte[]{0, 0x5A, 0};
int read = channel.read(buffer); byte[] chunk2 = new byte[]{0, 0, 0, 0, 0};
Assert.assertEquals(socks4MessageLength, read); channel.write(ByteBuffer.wrap(chunk1));
// Socks4 response, with split bytes. // Wait before sending the second chunk.
byte[] chunk1 = new byte[]{0, 0x5A, 0}; Thread.sleep(1000);
byte[] chunk2 = new byte[]{0, 0, 0, 0, 0};
channel.write(ByteBuffer.wrap(chunk1));
// Wait before sending the second chunk. channel.write(ByteBuffer.wrap(chunk2));
Thread.sleep(1000);
channel.write(ByteBuffer.wrap(chunk2)); buffer = ByteBuffer.allocate(method.length());
read = channel.read(buffer);
Assert.assertEquals(buffer.capacity(), read);
buffer.flip();
Assert.assertEquals(method, StandardCharsets.UTF_8.decode(buffer).toString());
buffer = ByteBuffer.allocate(method.length()); // Response
read = channel.read(buffer); String response = "" +
Assert.assertEquals(buffer.capacity(), read); "HTTP/1.1 200 OK\r\n" +
buffer.flip(); "Content-Length: 0\r\n" +
Assert.assertEquals(method, StandardCharsets.UTF_8.decode(buffer).toString()); "Connection: close\r\n" +
"\r\n";
channel.write(ByteBuffer.wrap(response.getBytes("UTF-8")));
// Response Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
String response = "" + }
"HTTP/1.1 200 OK\r\n" +
"Content-Length: 0\r\n" +
"Connection: close\r\n" +
"\r\n";
channel.write(ByteBuffer.wrap(response.getBytes("UTF-8")));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
channel.close();
} }
} }

View File

@ -104,87 +104,91 @@ public class TLSServerConnectionCloseTest
private void testServerSendsConnectionClose(boolean chunked, String content) throws Exception private void testServerSendsConnectionClose(boolean chunked, String content) throws Exception
{ {
ServerSocket server = new ServerSocket(0); try (ServerSocket server = new ServerSocket(0))
int port = server.getLocalPort();
startClient();
Request request = client.newRequest("localhost", port).scheme("https").path("/ctx/path");
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
Socket socket = server.accept();
SSLContext sslContext = client.getSslContextFactory().getSslContext();
SSLSocket sslSocket = (SSLSocket)sslContext.getSocketFactory().createSocket(socket, "localhost", port, false);
sslSocket.setUseClientMode(false);
sslSocket.startHandshake();
InputStream input = sslSocket.getInputStream();
consumeRequest(input);
OutputStream output = sslSocket.getOutputStream();
String serverResponse = "" +
"HTTP/1.1 200 OK\r\n" +
"Connection: close\r\n";
if (chunked)
{ {
serverResponse += "" + int port = server.getLocalPort();
"Transfer-Encoding: chunked\r\n" +
"\r\n"; startClient();
Request request = client.newRequest("localhost", port).scheme("https").path("/ctx/path");
FutureResponseListener listener = new FutureResponseListener(request);
request.send(listener);
try (Socket socket = server.accept())
{
SSLContext sslContext = client.getSslContextFactory().getSslContext();
SSLSocket sslSocket = (SSLSocket)sslContext.getSocketFactory().createSocket(socket, "localhost", port, false);
sslSocket.setUseClientMode(false);
sslSocket.startHandshake();
InputStream input = sslSocket.getInputStream();
consumeRequest(input);
OutputStream output = sslSocket.getOutputStream();
String serverResponse = "" +
"HTTP/1.1 200 OK\r\n" +
"Connection: close\r\n";
if (chunked)
{
serverResponse += "" +
"Transfer-Encoding: chunked\r\n" +
"\r\n";
for (int i = 0; i < 2; ++i) for (int i = 0; i < 2; ++i)
{ {
serverResponse += serverResponse +=
Integer.toHexString(content.length()) + "\r\n" + Integer.toHexString(content.length()) + "\r\n" +
content + "\r\n"; content + "\r\n";
} }
serverResponse += "" + serverResponse += "" +
"0\r\n" + "0\r\n" +
"\r\n"; "\r\n";
} }
else else
{ {
serverResponse += "Content-Length: " + content.length() + "\r\n"; serverResponse += "Content-Length: " + content.length() + "\r\n";
serverResponse += "\r\n"; serverResponse += "\r\n";
serverResponse += content; serverResponse += content;
} }
output.write(serverResponse.getBytes("UTF-8")); output.write(serverResponse.getBytes("UTF-8"));
output.flush(); output.flush();
switch (closeMode) switch (closeMode)
{ {
case NONE: case NONE:
{ {
break; break;
} }
case CLOSE: case CLOSE:
{ {
sslSocket.close(); sslSocket.close();
break; break;
} }
case ABRUPT: case ABRUPT:
{ {
socket.shutdownOutput(); socket.shutdownOutput();
break; break;
} }
default: default:
{ {
throw new IllegalStateException(); throw new IllegalStateException();
}
}
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Give some time to process the connection.
Thread.sleep(1000);
// Connection should have been removed from pool.
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getIdleConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnectionCount());
} }
} }
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Give some time to process the connection.
Thread.sleep(1000);
// Connection should have been removed from pool.
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getIdleConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnectionCount());
} }
private boolean consumeRequest(InputStream input) throws IOException private boolean consumeRequest(InputStream input) throws IOException

View File

@ -101,72 +101,72 @@ public class SslBytesClientTest extends SslBytesTest
Assert.assertTrue(proxy.awaitClient(5, TimeUnit.SECONDS)); Assert.assertTrue(proxy.awaitClient(5, TimeUnit.SECONDS));
final SSLSocket server = (SSLSocket)acceptor.accept(); try (SSLSocket server = (SSLSocket)acceptor.accept())
server.setUseClientMode(false);
Future<Object> handshake = threadPool.submit(() ->
{ {
server.startHandshake(); server.setUseClientMode(false);
return null;
});
// Client Hello Future<Object> handshake = threadPool.submit(() ->
TLSRecord record = proxy.readFromClient(); {
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); server.startHandshake();
proxy.flushToServer(record); return null;
});
// Server Hello + Certificate + Server Done // Client Hello
record = proxy.readFromServer(); TLSRecord record = proxy.readFromClient();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToClient(record); proxy.flushToServer(record);
// Client Key Exchange // Server Hello + Certificate + Server Done
record = proxy.readFromClient(); record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToServer(record); proxy.flushToClient(record);
// Change Cipher Spec // Client Key Exchange
record = proxy.readFromClient(); record = proxy.readFromClient();
Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToServer(record); proxy.flushToServer(record);
// Client Done // Change Cipher Spec
record = proxy.readFromClient(); record = proxy.readFromClient();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType());
proxy.flushToServer(record); proxy.flushToServer(record);
// Change Cipher Spec // Client Done
record = proxy.readFromServer(); record = proxy.readFromClient();
Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToClient(record); proxy.flushToServer(record);
// Server Done // Change Cipher Spec
record = proxy.readFromServer(); record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType());
proxy.flushToClient(record); proxy.flushToClient(record);
Assert.assertNull(handshake.get(5, TimeUnit.SECONDS)); // Server Done
record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToClient(record);
SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow(); Assert.assertNull(handshake.get(5, TimeUnit.SECONDS));
// Read request
BufferedReader reader = new BufferedReader(new InputStreamReader(server.getInputStream(), StandardCharsets.UTF_8));
String line = reader.readLine();
Assert.assertTrue(line.startsWith("GET"));
while (line.length() > 0)
line = reader.readLine();
// Write response SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow();
OutputStream output = server.getOutputStream(); // Read request
output.write(("HTTP/1.1 200 OK\r\n" + BufferedReader reader = new BufferedReader(new InputStreamReader(server.getInputStream(), StandardCharsets.UTF_8));
"Content-Length: 0\r\n" + String line = reader.readLine();
"\r\n").getBytes(StandardCharsets.UTF_8)); Assert.assertTrue(line.startsWith("GET"));
output.flush(); while (line.length() > 0)
Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS)); line = reader.readLine();
ContentResponse response = listener.get(5, TimeUnit.SECONDS); // Write response
Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); OutputStream output = server.getOutputStream();
output.write(("HTTP/1.1 200 OK\r\n" +
"Content-Length: 0\r\n" +
"\r\n").getBytes(StandardCharsets.UTF_8));
output.flush();
Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS));
server.close(); ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
}
} }
@Test @Test
@ -178,109 +178,109 @@ public class SslBytesClientTest extends SslBytesTest
Assert.assertTrue(proxy.awaitClient(5, TimeUnit.SECONDS)); Assert.assertTrue(proxy.awaitClient(5, TimeUnit.SECONDS));
final SSLSocket server = (SSLSocket)acceptor.accept(); try (SSLSocket server = (SSLSocket)acceptor.accept())
server.setUseClientMode(false);
Future<Object> handshake = threadPool.submit(() ->
{ {
server.startHandshake(); server.setUseClientMode(false);
return null;
});
SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow(); Future<Object> handshake = threadPool.submit(() ->
Assert.assertNull(handshake.get(5, TimeUnit.SECONDS)); {
server.startHandshake();
return null;
});
// Read request SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow();
InputStream serverInput = server.getInputStream(); Assert.assertNull(handshake.get(5, TimeUnit.SECONDS));
BufferedReader reader = new BufferedReader(new InputStreamReader(serverInput, StandardCharsets.UTF_8));
String line = reader.readLine();
Assert.assertTrue(line.startsWith("GET"));
while (line.length() > 0)
line = reader.readLine();
OutputStream serverOutput = server.getOutputStream(); // Read request
byte[] data1 = new byte[1024]; InputStream serverInput = server.getInputStream();
Arrays.fill(data1, (byte)'X'); BufferedReader reader = new BufferedReader(new InputStreamReader(serverInput, StandardCharsets.UTF_8));
String content1 = new String(data1, StandardCharsets.UTF_8); String line = reader.readLine();
byte[] data2 = new byte[1024]; Assert.assertTrue(line.startsWith("GET"));
Arrays.fill(data2, (byte)'Y'); while (line.length() > 0)
final String content2 = new String(data2, StandardCharsets.UTF_8); line = reader.readLine();
// Write first part of the response
serverOutput.write(("HTTP/1.1 200 OK\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1).getBytes(StandardCharsets.UTF_8));
serverOutput.flush();
Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS));
// Renegotiate OutputStream serverOutput = server.getOutputStream();
Future<Object> renegotiation = threadPool.submit(() -> byte[] data1 = new byte[1024];
{ Arrays.fill(data1, (byte)'X');
server.startHandshake(); String content1 = new String(data1, StandardCharsets.UTF_8);
return null; byte[] data2 = new byte[1024];
}); Arrays.fill(data2, (byte)'Y');
final String content2 = new String(data2, StandardCharsets.UTF_8);
// Write first part of the response
serverOutput.write(("HTTP/1.1 200 OK\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1).getBytes(StandardCharsets.UTF_8));
serverOutput.flush();
Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS));
// Renegotiation Handshake // Renegotiate
TLSRecord record = proxy.readFromServer(); Future<Object> renegotiation = threadPool.submit(() ->
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); {
proxy.flushToClient(record); server.startHandshake();
return null;
});
// Renegotiation Handshake // Renegotiation Handshake
record = proxy.readFromClient(); TLSRecord record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToServer(record); proxy.flushToClient(record);
// Trigger a read to have the server write the final renegotiation steps // Renegotiation Handshake
server.setSoTimeout(100); record = proxy.readFromClient();
try Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
{ proxy.flushToServer(record);
serverInput.read();
Assert.fail(); // Trigger a read to have the server write the final renegotiation steps
server.setSoTimeout(100);
try
{
serverInput.read();
Assert.fail();
}
catch (SocketTimeoutException x)
{
// Expected
}
// Renegotiation Handshake
record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToClient(record);
// Renegotiation Change Cipher
record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType());
proxy.flushToClient(record);
// Renegotiation Handshake
record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToClient(record);
// Renegotiation Change Cipher
record = proxy.readFromClient();
Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType());
proxy.flushToServer(record);
// Renegotiation Handshake
record = proxy.readFromClient();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToServer(record);
Assert.assertNull(renegotiation.get(5, TimeUnit.SECONDS));
// Complete the response
automaticProxyFlow = proxy.startAutomaticFlow();
serverOutput.write(data2);
serverOutput.flush();
Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS));
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertEquals(data1.length + data2.length, response.getContent().length);
} }
catch (SocketTimeoutException x)
{
// Expected
}
// Renegotiation Handshake
record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToClient(record);
// Renegotiation Change Cipher
record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType());
proxy.flushToClient(record);
// Renegotiation Handshake
record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToClient(record);
// Renegotiation Change Cipher
record = proxy.readFromClient();
Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType());
proxy.flushToServer(record);
// Renegotiation Handshake
record = proxy.readFromClient();
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
proxy.flushToServer(record);
Assert.assertNull(renegotiation.get(5, TimeUnit.SECONDS));
// Complete the response
automaticProxyFlow = proxy.startAutomaticFlow();
serverOutput.write(data2);
serverOutput.flush();
Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS));
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Assert.assertEquals(data1.length + data2.length, response.getContent().length);
server.close();
} }
@Test @Test
@ -294,60 +294,60 @@ public class SslBytesClientTest extends SslBytesTest
Assert.assertTrue(proxy.awaitClient(5, TimeUnit.SECONDS)); Assert.assertTrue(proxy.awaitClient(5, TimeUnit.SECONDS));
final SSLSocket server = (SSLSocket)acceptor.accept(); try (SSLSocket server = (SSLSocket)acceptor.accept())
server.setUseClientMode(false);
Future<Object> handshake = threadPool.submit(() ->
{ {
server.startHandshake(); server.setUseClientMode(false);
return null;
});
SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow(); Future<Object> handshake = threadPool.submit(() ->
Assert.assertNull(handshake.get(5, TimeUnit.SECONDS)); {
server.startHandshake();
return null;
});
// Read request SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow();
InputStream serverInput = server.getInputStream(); Assert.assertNull(handshake.get(5, TimeUnit.SECONDS));
BufferedReader reader = new BufferedReader(new InputStreamReader(serverInput, StandardCharsets.UTF_8));
String line = reader.readLine();
Assert.assertTrue(line.startsWith("GET"));
while (line.length() > 0)
line = reader.readLine();
OutputStream serverOutput = server.getOutputStream(); // Read request
byte[] data1 = new byte[1024]; InputStream serverInput = server.getInputStream();
Arrays.fill(data1, (byte)'X'); BufferedReader reader = new BufferedReader(new InputStreamReader(serverInput, StandardCharsets.UTF_8));
String content1 = new String(data1, StandardCharsets.UTF_8); String line = reader.readLine();
byte[] data2 = new byte[1024]; Assert.assertTrue(line.startsWith("GET"));
Arrays.fill(data2, (byte)'Y'); while (line.length() > 0)
final String content2 = new String(data2, StandardCharsets.UTF_8); line = reader.readLine();
// Write first part of the response
serverOutput.write(("HTTP/1.1 200 OK\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1).getBytes(StandardCharsets.UTF_8));
serverOutput.flush();
Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS));
// Renegotiate OutputStream serverOutput = server.getOutputStream();
threadPool.submit(() -> byte[] data1 = new byte[1024];
{ Arrays.fill(data1, (byte)'X');
server.startHandshake(); String content1 = new String(data1, StandardCharsets.UTF_8);
return null; byte[] data2 = new byte[1024];
}); Arrays.fill(data2, (byte)'Y');
final String content2 = new String(data2, StandardCharsets.UTF_8);
// Write first part of the response
serverOutput.write(("HTTP/1.1 200 OK\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Length: " + (content1.length() + content2.length()) + "\r\n" +
"\r\n" +
content1).getBytes(StandardCharsets.UTF_8));
serverOutput.flush();
Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS));
// Renegotiation Handshake // Renegotiate
TLSRecord record = proxy.readFromServer(); threadPool.submit(() ->
Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); {
proxy.flushToClient(record); server.startHandshake();
return null;
});
// Client sends close alert. // Renegotiation Handshake
record = proxy.readFromClient(); TLSRecord record = proxy.readFromServer();
Assert.assertEquals(TLSRecord.Type.ALERT, record.getType()); Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType());
record = proxy.readFromClient(); proxy.flushToClient(record);
Assert.assertNull(record);
server.close(); // Client sends close alert.
record = proxy.readFromClient();
Assert.assertEquals(TLSRecord.Type.ALERT, record.getType());
record = proxy.readFromClient();
Assert.assertNull(record);
}
} }
} }

View File

@ -54,22 +54,24 @@ public class InvalidServerTest extends AbstractTest
} }
}, promise); }, promise);
Socket socket = server.accept(); try (Socket socket = server.accept())
OutputStream output = socket.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
Session session = promise.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(session);
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// Verify that the client closed the socket.
InputStream input = socket.getInputStream();
while (true)
{ {
int read = input.read(); OutputStream output = socket.getOutputStream();
if (read < 0) output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
break;
Session session = promise.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(session);
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
// Verify that the client closed the socket.
InputStream input = socket.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
} }
} }
} }

View File

@ -18,14 +18,6 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
@ -52,10 +44,17 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class IOTest public class IOTest
{ {
@Test @Test
@ -73,325 +72,312 @@ public class IOTest
@Test @Test
public void testHalfClose() throws Exception public void testHalfClose() throws Exception
{ {
ServerSocket connector = new ServerSocket(0); try (ServerSocket connector = new ServerSocket(0);
Socket client = new Socket("localhost", connector.getLocalPort());
Socket client = new Socket("localhost", connector.getLocalPort()); Socket server = connector.accept())
Socket server = connector.accept();
// we can write both ways
client.getOutputStream().write(1);
assertEquals(1, server.getInputStream().read());
server.getOutputStream().write(1);
assertEquals(1, client.getInputStream().read());
// shutdown output results in read -1
client.shutdownOutput();
assertEquals(-1, server.getInputStream().read());
// Even though EOF has been read, the server input is not seen as shutdown
assertFalse(server.isInputShutdown());
// and we can read -1 again
assertEquals(-1, server.getInputStream().read());
// but cannot write
try
{ {
// we can write both ways
client.getOutputStream().write(1); client.getOutputStream().write(1);
fail("exception expected"); assertEquals(1, server.getInputStream().read());
}
catch (SocketException e)
{
}
// but can still write in opposite direction.
server.getOutputStream().write(1);
assertEquals(1, client.getInputStream().read());
// server can shutdown input to match the shutdown out of client
server.shutdownInput();
// now we EOF instead of reading -1
try
{
server.getInputStream().read();
fail("exception expected");
}
catch (SocketException e)
{
}
// but can still write in opposite direction.
server.getOutputStream().write(1);
assertEquals(1, client.getInputStream().read());
// client can shutdown input
client.shutdownInput();
// now we EOF instead of reading -1
try
{
client.getInputStream().read();
fail("exception expected");
}
catch (SocketException e)
{
}
// But we can still write at the server (data which will never be read)
server.getOutputStream().write(1);
// and the server output is not shutdown
assertFalse(server.isOutputShutdown());
// until we explictly shut it down
server.shutdownOutput();
// and now we can't write
try
{
server.getOutputStream().write(1); server.getOutputStream().write(1);
fail("exception expected"); assertEquals(1, client.getInputStream().read());
// shutdown output results in read -1
client.shutdownOutput();
assertEquals(-1, server.getInputStream().read());
// Even though EOF has been read, the server input is not seen as shutdown
assertFalse(server.isInputShutdown());
// and we can read -1 again
assertEquals(-1, server.getInputStream().read());
// but cannot write
try
{
client.getOutputStream().write(1);
fail("exception expected");
}
catch (SocketException expected)
{
}
// but can still write in opposite direction.
server.getOutputStream().write(1);
assertEquals(1, client.getInputStream().read());
// server can shutdown input to match the shutdown out of client
server.shutdownInput();
// now we EOF instead of reading -1
try
{
server.getInputStream().read();
fail("exception expected");
}
catch (SocketException expected)
{
}
// but can still write in opposite direction.
server.getOutputStream().write(1);
assertEquals(1, client.getInputStream().read());
// client can shutdown input
client.shutdownInput();
// now we EOF instead of reading -1
try
{
client.getInputStream().read();
fail("exception expected");
}
catch (SocketException expected)
{
}
// But we can still write at the server (data which will never be read)
server.getOutputStream().write(1);
// and the server output is not shutdown
assertFalse(server.isOutputShutdown());
// until we explictly shut it down
server.shutdownOutput();
// and now we can't write
try
{
server.getOutputStream().write(1);
fail("exception expected");
}
catch (SocketException expected)
{
}
// but the sockets are still open
assertFalse(client.isClosed());
assertFalse(server.isClosed());
// but if we close one end
client.close();
// it is seen as closed.
assertTrue(client.isClosed());
// but not the other end
assertFalse(server.isClosed());
// which has to be closed explictly
server.close();
assertTrue(server.isClosed());
} }
catch (SocketException e)
{
}
// but the sockets are still open
assertFalse(client.isClosed());
assertFalse(server.isClosed());
// but if we close one end
client.close();
// it is seen as closed.
assertTrue(client.isClosed());
// but not the other end
assertFalse(server.isClosed());
// which has to be closed explictly
server.close();
assertTrue(server.isClosed());
} }
@Test @Test
public void testHalfCloseClientServer() throws Exception public void testHalfCloseClientServer() throws Exception
{ {
ServerSocketChannel connector = ServerSocketChannel.open(); try (ServerSocketChannel connector = ServerSocketChannel.open())
connector.socket().bind(null);
Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket();
client.setSoTimeout(1000);
client.setSoLinger(false, -1);
Socket server = connector.accept().socket();
server.setSoTimeout(1000);
server.setSoLinger(false, -1);
// Write from client to server
client.getOutputStream().write(1);
// Server reads
assertEquals(1, server.getInputStream().read());
// Write from server to client with oshut
server.getOutputStream().write(1);
// System.err.println("OSHUT "+server);
server.shutdownOutput();
// Client reads response
assertEquals(1, client.getInputStream().read());
try
{ {
// Client reads -1 and does ishut connector.socket().bind(null);
assertEquals(-1, client.getInputStream().read()); try (Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket())
assertFalse(client.isInputShutdown());
//System.err.println("ISHUT "+client);
client.shutdownInput();
// Client ???
//System.err.println("OSHUT "+client);
client.shutdownOutput();
//System.err.println("CLOSE "+client);
client.close();
// Server reads -1, does ishut and then close
assertEquals(-1, server.getInputStream().read());
assertFalse(server.isInputShutdown());
//System.err.println("ISHUT "+server);
try
{ {
server.shutdownInput(); client.setSoTimeout(1000);
} client.setSoLinger(false, -1);
catch (SocketException e) try (Socket server = connector.accept().socket())
{ {
// System.err.println(e); server.setSoTimeout(1000);
} server.setSoLinger(false, -1);
//System.err.println("CLOSE "+server);
server.close();
} // Write from client to server
catch (Exception e) client.getOutputStream().write(1);
{
System.err.println(e); // Server reads
assertTrue(OS.IS_OSX); assertEquals(1, server.getInputStream().read());
// Write from server to client with oshut
server.getOutputStream().write(1);
// System.err.println("OSHUT "+server);
server.shutdownOutput();
// Client reads response
assertEquals(1, client.getInputStream().read());
try
{
// Client reads -1 and does ishut
assertEquals(-1, client.getInputStream().read());
assertFalse(client.isInputShutdown());
//System.err.println("ISHUT "+client);
client.shutdownInput();
// Client ???
//System.err.println("OSHUT "+client);
client.shutdownOutput();
//System.err.println("CLOSE "+client);
client.close();
// Server reads -1, does ishut and then close
assertEquals(-1, server.getInputStream().read());
assertFalse(server.isInputShutdown());
//System.err.println("ISHUT "+server);
server.shutdownInput();
server.close();
}
catch (Exception e)
{
e.printStackTrace();
assertTrue(OS.IS_OSX);
}
}
}
} }
} }
@Test @Test
public void testHalfCloseBadClient() throws Exception public void testHalfCloseBadClient() throws Exception
{ {
ServerSocketChannel connector = ServerSocketChannel.open(); try (ServerSocketChannel connector = ServerSocketChannel.open())
connector.socket().bind(null);
Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket();
client.setSoTimeout(1000);
client.setSoLinger(false, -1);
Socket server = connector.accept().socket();
server.setSoTimeout(1000);
server.setSoLinger(false, -1);
// Write from client to server
client.getOutputStream().write(1);
// Server reads
assertEquals(1, server.getInputStream().read());
// Write from server to client with oshut
server.getOutputStream().write(1);
//System.err.println("OSHUT "+server);
server.shutdownOutput();
try
{ {
// Client reads response connector.socket().bind(null);
assertEquals(1, client.getInputStream().read());
// Client reads -1 try (Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket())
assertEquals(-1, client.getInputStream().read());
assertFalse(client.isInputShutdown());
// Client can still write as we are half closed
client.getOutputStream().write(1);
// Server can still read
assertEquals(1, server.getInputStream().read());
// Server now closes
server.close();
// Client still reads -1 (not broken pipe !!)
assertEquals(-1, client.getInputStream().read());
assertFalse(client.isInputShutdown());
Thread.sleep(100);
// Client still reads -1 (not broken pipe !!)
assertEquals(-1, client.getInputStream().read());
assertFalse(client.isInputShutdown());
// Client can still write data even though server is closed???
client.getOutputStream().write(1);
// Client eventually sees Broken Pipe
int i = 0;
try
{ {
for (i = 0; i < 100000; i++) client.setSoTimeout(1000);
client.setSoLinger(false, -1);
try (Socket server = connector.accept().socket())
{
server.setSoTimeout(1000);
server.setSoLinger(false, -1);
// Write from client to server
client.getOutputStream().write(1); client.getOutputStream().write(1);
Assert.fail(); // Server reads
} assertEquals(1, server.getInputStream().read());
catch (IOException e)
{
}
client.close();
} // Write from server to client with oshut
catch (Exception e) server.getOutputStream().write(1);
{ //System.err.println("OSHUT "+server);
System.err.println("PLEASE INVESTIGATE:"); server.shutdownOutput();
e.printStackTrace();
// Client reads response
assertEquals(1, client.getInputStream().read());
// Client reads -1
assertEquals(-1, client.getInputStream().read());
assertFalse(client.isInputShutdown());
// Client can still write as we are half closed
client.getOutputStream().write(1);
// Server can still read
assertEquals(1, server.getInputStream().read());
// Server now closes
server.close();
// Client still reads -1 (not broken pipe !!)
assertEquals(-1, client.getInputStream().read());
assertFalse(client.isInputShutdown());
Thread.sleep(100);
// Client still reads -1 (not broken pipe !!)
assertEquals(-1, client.getInputStream().read());
assertFalse(client.isInputShutdown());
// Client can still write data even though server is closed???
client.getOutputStream().write(1);
// Client eventually sees Broken Pipe
try
{
for (int i = 0; i < 100000; i++)
client.getOutputStream().write(1);
Assert.fail();
}
catch (IOException expected)
{
}
}
}
} }
} }
@Test @Test
public void testServerChannelInterrupt() throws Exception public void testServerChannelInterrupt() throws Exception
{ {
final ServerSocketChannel connector = ServerSocketChannel.open(); try (ServerSocketChannel connector = ServerSocketChannel.open())
connector.configureBlocking(true);
connector.socket().bind(null);
Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket();
client.setSoTimeout(2000);
client.setSoLinger(false, -1);
Socket server = connector.accept().socket();
server.setSoTimeout(2000);
server.setSoLinger(false, -1);
// Write from client to server
client.getOutputStream().write(1);
// Server reads
assertEquals(1, server.getInputStream().read());
// Write from server to client
server.getOutputStream().write(1);
// Client reads
assertEquals(1, client.getInputStream().read());
// block a thread in accept
final CountDownLatch alatch=new CountDownLatch(2);
Thread acceptor = new Thread()
{ {
@Override connector.configureBlocking(true);
public void run() connector.socket().bind(null);
try (Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket())
{ {
try client.setSoTimeout(2000);
client.setSoLinger(false, -1);
try (Socket server = connector.accept().socket())
{ {
alatch.countDown(); server.setSoTimeout(2000);
connector.accept(); server.setSoLinger(false, -1);
}
catch (Throwable e) // Write from client to server
{ client.getOutputStream().write(1);
} // Server reads
finally assertEquals(1, server.getInputStream().read());
{
alatch.countDown(); // Write from server to client
server.getOutputStream().write(1);
// Client reads
assertEquals(1, client.getInputStream().read());
// block a thread in accept
final CountDownLatch latch = new CountDownLatch(2);
Thread acceptor = new Thread(() ->
{
try
{
latch.countDown();
connector.accept();
}
catch (Throwable ignored)
{
}
finally
{
latch.countDown();
}
});
acceptor.start();
while (latch.getCount() == 2)
Thread.sleep(10);
// interrupt the acceptor
acceptor.interrupt();
// wait for acceptor to exit
assertTrue(latch.await(10, TimeUnit.SECONDS));
// connector is closed
assertFalse(connector.isOpen());
// but connection is still open
assertFalse(client.isClosed());
assertFalse(server.isClosed());
// Write from client to server
client.getOutputStream().write(42);
// Server reads
assertEquals(42, server.getInputStream().read());
// Write from server to client
server.getOutputStream().write(43);
// Client reads
assertEquals(43, client.getInputStream().read());
} }
} }
}; }
acceptor.start();
while (alatch.getCount()==2)
Thread.sleep(10);
// interrupt the acceptor
acceptor.interrupt();
// wait for acceptor to exit
assertTrue(alatch.await(10,TimeUnit.SECONDS));
// connector is closed
assertFalse(connector.isOpen());
// but connection is still open
assertFalse(client.isClosed());
assertFalse(server.isClosed());
// Write from client to server
client.getOutputStream().write(42);
// Server reads
assertEquals(42, server.getInputStream().read());
// Write from server to client
server.getOutputStream().write(43);
// Client reads
assertEquals(43, client.getInputStream().read());
client.close();
} }
@ -401,7 +387,7 @@ public class IOTest
{ {
try (ServerSocket connector = new ServerSocket(0); try (ServerSocket connector = new ServerSocket(0);
Socket client = new Socket("127.0.0.1", connector.getLocalPort()); Socket client = new Socket("127.0.0.1", connector.getLocalPort());
Socket server = connector.accept();) Socket server = connector.accept())
{ {
client.setTcpNoDelay(true); client.setTcpNoDelay(true);
client.setSoLinger(true, 0); client.setSoLinger(true, 0);
@ -430,8 +416,8 @@ public class IOTest
assertEquals(-1, server.getInputStream().read()); assertEquals(-1, server.getInputStream().read());
server.shutdownInput(); server.shutdownInput();
// Since output was already shutdown, server closes // Since output was already shutdown, server
server.close(); // closes in the try-with-resources block end.
} }
} }
@ -490,52 +476,48 @@ public class IOTest
assertEquals(expected,wrote); assertEquals(expected,wrote);
for (int i=0;i<buffers.length;i++) for (ByteBuffer buffer : buffers)
assertEquals(0,buffers[i].remaining()); assertEquals(0, buffer.remaining());
} }
@Test @Test
public void testSelectorWakeup() throws Exception public void testSelectorWakeup() throws Exception
{ {
ServerSocketChannel connector = ServerSocketChannel.open(); try (ServerSocketChannel connector = ServerSocketChannel.open())
connector.bind(null);
InetSocketAddress addr=(InetSocketAddress)connector.getLocalAddress();
SocketChannel client = SocketChannel.open();
client.connect(new InetSocketAddress("127.0.0.1",addr.getPort()));
SocketChannel server = connector.accept();
server.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey key = server.register(selector,SelectionKey.OP_READ);
assertThat(key,notNullValue());
assertThat(selector.selectNow(), is(0));
// Test wakeup before select
selector.wakeup();
assertThat(selector.select(), is(0));
// Test wakeup after select
new Thread()
{ {
@Override connector.bind(null);
public void run() InetSocketAddress addr = (InetSocketAddress)connector.getLocalAddress();
try (SocketChannel client = SocketChannel.open(new InetSocketAddress("127.0.0.1", addr.getPort()));
SocketChannel server = connector.accept())
{ {
try server.configureBlocking(false);
{
Thread.sleep(100);
selector.wakeup();
}
catch(Exception e)
{
e.printStackTrace();
}
}
}.start(); Selector selector = Selector.open();
assertThat(selector.select(), is(0)); SelectionKey key = server.register(selector, SelectionKey.OP_READ);
assertThat(key, notNullValue());
assertThat(selector.selectNow(), is(0));
// Test wakeup before select
selector.wakeup();
assertThat(selector.select(), is(0));
// Test wakeup after select
new Thread(() ->
{
try
{
Thread.sleep(100);
selector.wakeup();
}
catch (Exception e)
{
e.printStackTrace();
}
}).start();
assertThat(selector.select(), is(0));
}
}
} }
} }

View File

@ -18,10 +18,6 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -31,106 +27,105 @@ import java.nio.channels.SocketChannel;
import org.junit.Test; import org.junit.Test;
/** import static org.junit.Assert.assertEquals;
* import static org.junit.Assert.assertFalse;
*/ import static org.junit.Assert.assertTrue;
public class NIOTest public class NIOTest
{ {
@Test @Test
public void testSelector() throws Exception public void testSelector() throws Exception
{ {
ServerSocket acceptor = new ServerSocket(0); try (ServerSocket acceptor = new ServerSocket(0);
Selector selector = Selector.open();
SocketChannel client = SocketChannel.open(acceptor.getLocalSocketAddress());
Socket server = acceptor.accept())
{
server.setTcpNoDelay(true);
Selector selector = Selector.open(); // Make the client non blocking and register it with selector for reads
client.configureBlocking(false);
SelectionKey key = client.register(selector, SelectionKey.OP_READ);
// Create client server socket pair // assert it is not selected
SocketChannel client = SocketChannel.open(acceptor.getLocalSocketAddress()); assertTrue(key.isValid());
Socket server = acceptor.accept(); assertFalse(key.isReadable());
server.setTcpNoDelay(true); assertEquals(0, key.readyOps());
// Make the client non blocking and register it with selector for reads // try selecting and assert nothing selected
client.configureBlocking(false); int selected = selector.selectNow();
SelectionKey key = client.register(selector,SelectionKey.OP_READ); assertEquals(0, selected);
assertEquals(0, selector.selectedKeys().size());
assertTrue(key.isValid());
assertFalse(key.isReadable());
assertEquals(0, key.readyOps());
// assert it is not selected // Write a byte from server to client
assertTrue(key.isValid()); server.getOutputStream().write(42);
assertFalse(key.isReadable()); server.getOutputStream().flush();
assertEquals(0,key.readyOps());
// try selecting and assert nothing selected // select again and assert selection found for read
int selected = selector.selectNow(); selected = selector.select(1000);
assertEquals(0,selected); assertEquals(1, selected);
assertEquals(0,selector.selectedKeys().size()); assertEquals(1, selector.selectedKeys().size());
assertTrue(key.isValid()); assertTrue(key.isValid());
assertFalse(key.isReadable()); assertTrue(key.isReadable());
assertEquals(0,key.readyOps()); assertEquals(1, key.readyOps());
// Write a byte from server to client // select again and see that it is not reselect, but stays selected
server.getOutputStream().write(42); selected = selector.select(100);
server.getOutputStream().flush(); assertEquals(0, selected);
assertEquals(1, selector.selectedKeys().size());
assertTrue(key.isValid());
assertTrue(key.isReadable());
assertEquals(1, key.readyOps());
// select again and assert selection found for read // read the byte
selected = selector.select(1000); ByteBuffer buf = ByteBuffer.allocate(1024);
assertEquals(1,selected); int len = client.read(buf);
assertEquals(1,selector.selectedKeys().size()); assertEquals(1, len);
assertTrue(key.isValid()); buf.flip();
assertTrue(key.isReadable()); assertEquals(42, buf.get());
assertEquals(1,key.readyOps()); buf.clear();
// select again and see that it is not reselect, but stays selected // But this does not change the key
selected = selector.select(100); assertTrue(key.isValid());
assertEquals(0,selected); assertTrue(key.isReadable());
assertEquals(1,selector.selectedKeys().size()); assertEquals(1, key.readyOps());
assertTrue(key.isValid());
assertTrue(key.isReadable());
assertEquals(1,key.readyOps());
// read the byte // Even if we select again ?
ByteBuffer buf = ByteBuffer.allocate(1024); selected = selector.select(100);
int len=client.read(buf); assertEquals(0, selected);
assertEquals(1,len); assertEquals(1, selector.selectedKeys().size());
buf.flip(); assertTrue(key.isValid());
assertEquals(42,buf.get()); assertTrue(key.isReadable());
buf.clear(); assertEquals(1, key.readyOps());
// But this does not change the key // Unless we remove the key from the select set
assertTrue(key.isValid()); // and then it is still flagged as isReadable()
assertTrue(key.isReadable()); selector.selectedKeys().clear();
assertEquals(1,key.readyOps()); assertEquals(0, selector.selectedKeys().size());
assertTrue(key.isValid());
assertTrue(key.isReadable());
assertEquals(1, key.readyOps());
// Even if we select again ? // Now if we select again - it is still flagged as readable!!!
selected = selector.select(100); selected = selector.select(100);
assertEquals(0,selected); assertEquals(0, selected);
assertEquals(1,selector.selectedKeys().size()); assertEquals(0, selector.selectedKeys().size());
assertTrue(key.isValid()); assertTrue(key.isValid());
assertTrue(key.isReadable()); assertTrue(key.isReadable());
assertEquals(1,key.readyOps()); assertEquals(1, key.readyOps());
// Unless we remove the key from the select set // Only when it is selected for something else does that state change.
// and then it is still flagged as isReadable() key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
selector.selectedKeys().clear(); selected = selector.select(1000);
assertEquals(0,selector.selectedKeys().size()); assertEquals(1, selected);
assertTrue(key.isValid()); assertEquals(1, selector.selectedKeys().size());
assertTrue(key.isReadable()); assertTrue(key.isValid());
assertEquals(1,key.readyOps()); assertTrue(key.isWritable());
assertFalse(key.isReadable());
// Now if we select again - it is still flagged as readable!!! assertEquals(SelectionKey.OP_WRITE, key.readyOps());
selected = selector.select(100); }
assertEquals(0,selected);
assertEquals(0,selector.selectedKeys().size());
assertTrue(key.isValid());
assertTrue(key.isReadable());
assertEquals(1,key.readyOps());
// Only when it is selected for something else does that state change.
key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
selected = selector.select(1000);
assertEquals(1,selected);
assertEquals(1,selector.selectedKeys().size());
assertTrue(key.isValid());
assertTrue(key.isWritable());
assertFalse(key.isReadable());
assertEquals(SelectionKey.OP_WRITE,key.readyOps());
} }
} }

View File

@ -65,7 +65,7 @@ public class SocketChannelEndPointInterestsTest
{ {
@Override @Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{ {
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()) SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler())
{ {
@ -176,36 +176,37 @@ public class SocketChannelEndPointInterestsTest
} }
}); });
Socket client = new Socket(); try (Socket client = new Socket())
client.connect(connector.getLocalAddress()); {
client.setSoTimeout(5000); client.connect(connector.getLocalAddress());
client.setSoTimeout(5000);
try (SocketChannel server = connector.accept())
{
server.configureBlocking(false);
selectorManager.accept(server);
SocketChannel server = connector.accept(); OutputStream clientOutput = client.getOutputStream();
server.configureBlocking(false); clientOutput.write(1);
selectorManager.accept(server); clientOutput.flush();
Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
OutputStream clientOutput = client.getOutputStream(); // We do not read to keep the socket write blocked
clientOutput.write(1);
clientOutput.flush();
Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS));
// We do not read to keep the socket write blocked clientOutput.write(2);
clientOutput.flush();
Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
clientOutput.write(2); // Sleep before reading to allow waking up the server only for read
clientOutput.flush(); Thread.sleep(1000);
Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS));
// Sleep before reading to allow waking up the server only for read // Now read what was written, waking up the server for write
Thread.sleep(1000); InputStream clientInput = client.getInputStream();
while (size.getAndDecrement() > 0)
clientInput.read();
// Now read what was written, waking up the server for write Assert.assertNull(failure.get());
InputStream clientInput = client.getInputStream(); }
while (size.getAndDecrement() > 0) }
clientInput.read();
client.close();
Assert.assertNull(failure.get());
} }
private interface Interested private interface Interested
@ -214,5 +215,4 @@ public class SocketChannelEndPointInterestsTest
void onIncompleteFlush(); void onIncompleteFlush();
} }
} }

View File

@ -18,13 +18,6 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
@ -73,6 +66,13 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class SocketChannelEndPointTest public class SocketChannelEndPointTest
@ -143,60 +143,61 @@ public class SocketChannelEndPointTest
@Test @Test
public void testEcho() throws Exception public void testEcho() throws Exception
{ {
Socket client = _scenario.newClient(_connector); try (Socket client = _scenario.newClient(_connector))
client.setSoTimeout(60000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{ {
int b = client.getInputStream().read(); client.setSoTimeout(60000);
assertTrue(b > 0); try (SocketChannel server = _connector.accept())
assertEquals(c, (char) b); {
} server.configureBlocking(false);
_manager.accept(server);
// wait for read timeout // Write client to server
client.setSoTimeout(500); client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
try
{
client.getInputStream().read();
Assert.fail();
}
catch (SocketTimeoutException e)
{
long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start;
Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L));
}
// write then shutdown // Verify echo server to client
client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8)); for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
// Verify echo server to client // wait for read timeout
for (char c : "Goodbye Cruel TLS".toCharArray()) client.setSoTimeout(500);
{ long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
int b = client.getInputStream().read(); try
Assert.assertThat("expect valid char integer", b, greaterThan(0)); {
assertEquals("expect characters to be same", c, (char) b); client.getInputStream().read();
} Assert.fail();
client.close(); }
catch (SocketTimeoutException e)
{
long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start;
Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L));
}
for (int i = 0; i < 10; ++i) // write then shutdown
{ client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8));
if (server.isOpen())
Thread.sleep(10); // Verify echo server to client
else for (char c : "Goodbye Cruel TLS".toCharArray())
break; {
int b = client.getInputStream().read();
Assert.assertThat("expect valid char integer", b, greaterThan(0));
assertEquals("expect characters to be same", c, (char)b);
}
client.close();
for (int i = 0; i < 10; ++i)
{
if (server.isOpen())
Thread.sleep(10);
else
break;
}
assertFalse(server.isOpen());
}
} }
assertFalse(server.isOpen());
} }
@Test @Test
@ -204,265 +205,265 @@ public class SocketChannelEndPointTest
{ {
assumeTrue("Scenario supports half-close", _scenario.supportsHalfCloses()); assumeTrue("Scenario supports half-close", _scenario.supportsHalfCloses());
Socket client = _scenario.newClient(_connector); try (Socket client = _scenario.newClient(_connector))
client.setSoTimeout(500);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{ {
int b = client.getInputStream().read(); client.setSoTimeout(500);
assertTrue(b > 0); try (SocketChannel server = _connector.accept())
assertEquals(c, (char) b); {
} server.configureBlocking(false);
_manager.accept(server);
// wait for read timeout // Write client to server
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
try
{
client.getInputStream().read();
Assert.fail();
}
catch (SocketTimeoutException e)
{
assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start >= 400);
}
// write then shutdown // Verify echo server to client
client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8)); for (char c : "HelloWorld".toCharArray())
client.shutdownOutput(); {
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
// Verify echo server to client // wait for read timeout
for (char c : "Goodbye Cruel TLS".toCharArray()) long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
{ try
int b = client.getInputStream().read(); {
assertTrue(b > 0); client.getInputStream().read();
assertEquals(c, (char) b); Assert.fail();
}
catch (SocketTimeoutException e)
{
assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start >= 400);
}
// write then shutdown
client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8));
client.shutdownOutput();
// Verify echo server to client
for (char c : "Goodbye Cruel TLS".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
// Read close
assertEquals(-1, client.getInputStream().read());
}
} }
// Read close
assertEquals(-1, client.getInputStream().read());
} }
@Test @Test
public void testReadBlocked() throws Exception public void testReadBlocked() throws Exception
{ {
Socket client = _scenario.newClient(_connector); try (Socket client = _scenario.newClient(_connector);
SocketChannel server = _connector.accept())
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
OutputStream clientOutputStream = client.getOutputStream();
InputStream clientInputStream = client.getInputStream();
int specifiedTimeout = 1000;
client.setSoTimeout(specifiedTimeout);
// Write 8 and cause block waiting for 10
_blockAt.set(10);
clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8));
clientOutputStream.flush();
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
_lastEndPoint.setIdleTimeout(10 * specifiedTimeout);
Thread.sleep((11 * specifiedTimeout) / 10);
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
try
{ {
int b = clientInputStream.read(); server.configureBlocking(false);
Assert.fail("Should have timed out waiting for a response, but read " + b); _manager.accept(server);
}
catch (SocketTimeoutException e)
{
int elapsed = Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start).intValue();
Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3 * specifiedTimeout / 4));
}
// write remaining characters OutputStream clientOutputStream = client.getOutputStream();
clientOutputStream.write("90ABCDEF".getBytes(StandardCharsets.UTF_8)); InputStream clientInputStream = client.getInputStream();
clientOutputStream.flush();
// Verify echo server to client int specifiedTimeout = 1000;
for (char c : "1234567890ABCDEF".toCharArray()) client.setSoTimeout(specifiedTimeout);
{
int b = clientInputStream.read(); // Write 8 and cause block waiting for 10
assertTrue(b > 0); _blockAt.set(10);
assertEquals(c, (char) b); clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8));
clientOutputStream.flush();
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
_lastEndPoint.setIdleTimeout(10 * specifiedTimeout);
Thread.sleep((11 * specifiedTimeout) / 10);
long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
try
{
int b = clientInputStream.read();
Assert.fail("Should have timed out waiting for a response, but read " + b);
}
catch (SocketTimeoutException e)
{
int elapsed = Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start).intValue();
Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3 * specifiedTimeout / 4));
}
// write remaining characters
clientOutputStream.write("90ABCDEF".getBytes(StandardCharsets.UTF_8));
clientOutputStream.flush();
// Verify echo server to client
for (char c : "1234567890ABCDEF".toCharArray())
{
int b = clientInputStream.read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
} }
} }
@Test @Test
public void testStress() throws Exception public void testStress() throws Exception
{ {
Socket client = _scenario.newClient(_connector); try (Socket client = _scenario.newClient(_connector))
client.setSoTimeout(30000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
final int writes = 200000;
final byte[] bytes = "HelloWorld-".getBytes(StandardCharsets.UTF_8);
byte[] count = "0\n".getBytes(StandardCharsets.UTF_8);
BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream());
final CountDownLatch latch = new CountDownLatch(writes);
final InputStream in = new BufferedInputStream(client.getInputStream());
final long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
out.write(bytes);
out.write(count);
out.flush();
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
_lastEndPoint.setIdleTimeout(5000);
new Thread()
{ {
@Override client.setSoTimeout(30000);
public void run() try (SocketChannel server = _connector.accept())
{ {
Thread.currentThread().setPriority(MAX_PRIORITY); server.configureBlocking(false);
long last = -1; _manager.accept(server);
int count = -1; final int writes = 200000;
try
{
while (latch.getCount() > 0)
{
// Verify echo server to client
for (byte b0 : bytes)
{
int b = in.read();
Assert.assertThat(b, greaterThan(0));
assertEquals(0xff & b0, b);
}
count = 0; final byte[] bytes = "HelloWorld-".getBytes(StandardCharsets.UTF_8);
int b = in.read(); byte[] count = "0\n".getBytes(StandardCharsets.UTF_8);
while (b > 0 && b != '\n') BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream());
{ final CountDownLatch latch = new CountDownLatch(writes);
count = count * 10 + (b - '0'); final InputStream in = new BufferedInputStream(client.getInputStream());
b = in.read(); final long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
} out.write(bytes);
last = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); out.write(count);
//if (latch.getCount()%1000==0)
// System.out.println(writes-latch.getCount());
latch.countDown();
}
}
catch (Throwable e)
{
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
System.err.println("count=" + count);
System.err.println("latch=" + latch.getCount());
System.err.println("time=" + (now - start));
System.err.println("last=" + (now - last));
System.err.println("endp=" + _lastEndPoint);
System.err.println("conn=" + _lastEndPoint.getConnection());
e.printStackTrace();
}
}
}.start();
// Write client to server
for (int i = 1; i < writes; i++)
{
out.write(bytes);
out.write(Integer.toString(i).getBytes(StandardCharsets.ISO_8859_1));
out.write('\n');
if (i % 1000 == 0)
{
//System.err.println(i+"/"+writes);
out.flush(); out.flush();
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
_lastEndPoint.setIdleTimeout(5000);
new Thread(() ->
{
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
long last = -1;
int count1 = -1;
try
{
while (latch.getCount() > 0)
{
// Verify echo server to client
for (byte b0 : bytes)
{
int b = in.read();
Assert.assertThat(b, greaterThan(0));
assertEquals(0xff & b0, b);
}
count1 = 0;
int b = in.read();
while (b > 0 && b != '\n')
{
count1 = count1 * 10 + (b - '0');
b = in.read();
}
last = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
//if (latch.getCount()%1000==0)
// System.out.println(writes-latch.getCount());
latch.countDown();
}
}
catch (Throwable e)
{
long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
System.err.println("count=" + count1);
System.err.println("latch=" + latch.getCount());
System.err.println("time=" + (now - start));
System.err.println("last=" + (now - last));
System.err.println("endp=" + _lastEndPoint);
System.err.println("conn=" + _lastEndPoint.getConnection());
e.printStackTrace();
}
}).start();
// Write client to server
for (int i = 1; i < writes; i++)
{
out.write(bytes);
out.write(Integer.toString(i).getBytes(StandardCharsets.ISO_8859_1));
out.write('\n');
if (i % 1000 == 0)
{
//System.err.println(i+"/"+writes);
out.flush();
}
Thread.yield();
}
out.flush();
long last = latch.getCount();
while (!latch.await(5, TimeUnit.SECONDS))
{
//System.err.println(latch.getCount());
if (latch.getCount() == last)
Assert.fail();
last = latch.getCount();
}
assertEquals(0, latch.getCount());
} }
Thread.yield();
} }
out.flush();
long last = latch.getCount();
while (!latch.await(5, TimeUnit.SECONDS))
{
//System.err.println(latch.getCount());
if (latch.getCount() == last)
Assert.fail();
last = latch.getCount();
}
assertEquals(0, latch.getCount());
} }
@Test @Test
public void testWriteBlocked() throws Exception public void testWriteBlocked() throws Exception
{ {
Socket client = _scenario.newClient(_connector); try (Socket client = _scenario.newClient(_connector))
client.setSoTimeout(10000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
_writeCount.set(10000);
String data = "Now is the time for all good men to come to the aid of the party";
client.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));
BufferedInputStream in = new BufferedInputStream(client.getInputStream());
int byteNum = 0;
try
{ {
for (int i = 0; i < _writeCount.get(); i++) client.setSoTimeout(10000);
try (SocketChannel server = _connector.accept())
{ {
if (i % 1000 == 0) server.configureBlocking(false);
TimeUnit.MILLISECONDS.sleep(200); _manager.accept(server);
// Verify echo server to client // Write client to server
for (int j = 0; j < data.length(); j++) _writeCount.set(10000);
String data = "Now is the time for all good men to come to the aid of the party";
client.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));
BufferedInputStream in = new BufferedInputStream(client.getInputStream());
int byteNum = 0;
try
{ {
char c = data.charAt(j); for (int i = 0; i < _writeCount.get(); i++)
int b = in.read(); {
byteNum++; if (i % 1000 == 0)
assertTrue(b > 0); TimeUnit.MILLISECONDS.sleep(200);
assertEquals("test-" + i + "/" + j, c, (char) b);
// Verify echo server to client
for (int j = 0; j < data.length(); j++)
{
char c = data.charAt(j);
int b = in.read();
byteNum++;
assertTrue(b > 0);
assertEquals("test-" + i + "/" + j, c, (char)b);
}
if (i == 0)
_lastEndPoint.setIdleTimeout(60000);
}
}
catch (SocketTimeoutException e)
{
System.err.println("SelectorManager.dump() = " + _manager.dump());
LOG.warn("Server: " + server);
LOG.warn("Error reading byte #" + byteNum, e);
throw e;
} }
if (i == 0) client.close();
_lastEndPoint.setIdleTimeout(60000);
for (int i = 0; i < 10; ++i)
{
if (server.isOpen())
Thread.sleep(10);
else
break;
}
assertFalse(server.isOpen());
} }
} }
catch (SocketTimeoutException e)
{
System.err.println("SelectorManager.dump() = " + _manager.dump());
LOG.warn("Server: " + server);
LOG.warn("Error reading byte #" + byteNum, e);
throw e;
}
client.close();
for (int i = 0; i < 10; ++i)
{
if (server.isOpen())
Thread.sleep(10);
else
break;
}
assertFalse(server.isOpen());
} }
@ -482,7 +483,7 @@ public class SocketChannelEndPointTest
{ {
@Override @Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{ {
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler()); SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler());
_lastEndPoint = endp; _lastEndPoint = endp;
@ -491,7 +492,7 @@ public class SocketChannelEndPointTest
} }
@Override @Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
{ {
return new TestConnection(endpoint, latch, getExecutor(), _blockAt, _writeCount); return new TestConnection(endpoint, latch, getExecutor(), _blockAt, _writeCount);
} }
@ -507,18 +508,14 @@ public class SocketChannelEndPointTest
CountDownLatch closed = new CountDownLatch(20); CountDownLatch closed = new CountDownLatch(20);
for (int i = 0; i < 20; i++) for (int i = 0; i < 20; i++)
{ {
new Thread() new Thread(() ->
{ {
@Override try (Socket client = _scenario.newClient(_connector))
public void run()
{ {
try (Socket client = _scenario.newClient(_connector);) client.setSoTimeout(5000);
try (SocketChannel server = _connector.accept())
{ {
client.setSoTimeout(5000);
SocketChannel server = _connector.accept();
server.configureBlocking(false); server.configureBlocking(false);
_manager.accept(server); _manager.accept(server);
// Write client to server // Write client to server
@ -531,26 +528,26 @@ public class SocketChannelEndPointTest
{ {
int b = client.getInputStream().read(); int b = client.getInputStream().read();
assertTrue(b > 0); assertTrue(b > 0);
assertEquals(c, (char) b); assertEquals(c, (char)b);
} }
assertEquals(-1, client.getInputStream().read()); assertEquals(-1, client.getInputStream().read());
echoed.incrementAndGet(); echoed.incrementAndGet();
} }
catch (SocketTimeoutException x)
{
x.printStackTrace();
timeout.incrementAndGet();
}
catch (Throwable x)
{
rejections.incrementAndGet();
}
finally
{
closed.countDown();
}
} }
}.start(); catch (SocketTimeoutException x)
{
x.printStackTrace();
timeout.incrementAndGet();
}
catch (Throwable x)
{
rejections.incrementAndGet();
}
finally
{
closed.countDown();
}
}).start();
} }
// unblock the handling // unblock the handling
@ -572,25 +569,25 @@ public class SocketChannelEndPointTest
try (Socket client = _scenario.newClient(_connector)) try (Socket client = _scenario.newClient(_connector))
{ {
client.setSoTimeout(5000); client.setSoTimeout(5000);
try (SocketChannel server = _connector.accept())
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
client.getOutputStream().flush();
client.shutdownOutput();
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{ {
int b = client.getInputStream().read(); server.configureBlocking(false);
assertTrue(b > 0); _manager.accept(server);
assertEquals(c, (char) b);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
client.getOutputStream().flush();
client.shutdownOutput();
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
assertEquals(-1, client.getInputStream().read());
} }
assertEquals(-1, client.getInputStream().read());
} }
} }
@ -601,7 +598,7 @@ public class SocketChannelEndPointTest
super(executor, scheduler); super(executor, scheduler);
} }
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{ {
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()); SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(60000); endp.setIdleTimeout(60000);
@ -611,7 +608,7 @@ public class SocketChannelEndPointTest
} }
@Override @Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
{ {
return _scenario.newConnection(channel, endpoint, getExecutor(), _blockAt, _writeCount); return _scenario.newConnection(channel, endpoint, getExecutor(), _blockAt, _writeCount);
} }

View File

@ -91,7 +91,7 @@ public class SslConnectionTest
@Override @Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{ {
SocketChannelEndPoint endp = new TestEP(channel, selector, selectionKey, getScheduler()); SocketChannelEndPoint endp = new TestEP(channel, selector, selectionKey, getScheduler());
endp.setIdleTimeout(60000); endp.setIdleTimeout(60000);
@ -126,8 +126,7 @@ public class SslConnectionTest
return false; return false;
} }
} }
boolean flushed=super.flush(buffers); return super.flush(buffers);
return flushed;
} }
} }
@ -186,15 +185,7 @@ public class SslConnectionTest
fillInterested(); fillInterested();
else else
{ {
getExecutor().execute(new Runnable() getExecutor().execute(() -> getEndPoint().write(_writeCallback,BufferUtil.toBuffer("Hello Client")));
{
@Override
public void run()
{
getEndPoint().write(_writeCallback,BufferUtil.toBuffer("Hello Client"));
}
});
} }
} }
@ -264,52 +255,54 @@ public class SslConnectionTest
@Test @Test
public void testHelloWorld() throws Exception public void testHelloWorld() throws Exception
{ {
Socket client = newClient(); try (Socket client = newClient())
client.setSoTimeout(60000); {
client.setSoTimeout(60000);
try (SocketChannel server = _connector.accept())
{
server.configureBlocking(false);
_manager.accept(server);
SocketChannel server = _connector.accept(); client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8));
server.configureBlocking(false); byte[] buffer = new byte[1024];
_manager.accept(server); int len = client.getInputStream().read(buffer);
Assert.assertEquals(5, len);
Assert.assertEquals("Hello", new String(buffer, 0, len, StandardCharsets.UTF_8));
client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); _dispatches.set(0);
byte[] buffer = new byte[1024]; client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8));
int len=client.getInputStream().read(buffer); len = 5;
Assert.assertEquals(5, len); while (len > 0)
Assert.assertEquals("Hello",new String(buffer,0,len,StandardCharsets.UTF_8)); len -= client.getInputStream().read(buffer);
}
_dispatches.set(0); }
client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8));
len=5;
while(len>0)
len-=client.getInputStream().read(buffer);
client.close();
} }
@Test @Test
public void testRenegotiate() throws Exception public void testRenegotiate() throws Exception
{ {
SSLSocket client = newClient(); try (SSLSocket client = newClient())
client.setSoTimeout(60000); {
client.setSoTimeout(60000);
try (SocketChannel server = _connector.accept())
{
server.configureBlocking(false);
_manager.accept(server);
SocketChannel server = _connector.accept(); client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8));
server.configureBlocking(false); byte[] buffer = new byte[1024];
_manager.accept(server); int len = client.getInputStream().read(buffer);
Assert.assertEquals(5, len);
Assert.assertEquals("Hello", new String(buffer, 0, len, StandardCharsets.UTF_8));
client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); client.startHandshake();
byte[] buffer = new byte[1024];
int len=client.getInputStream().read(buffer);
Assert.assertEquals(5, len);
Assert.assertEquals("Hello",new String(buffer,0,len,StandardCharsets.UTF_8));
client.startHandshake(); client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8));
len = client.getInputStream().read(buffer);
client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); Assert.assertEquals(5, len);
len=client.getInputStream().read(buffer); Assert.assertEquals("World", new String(buffer, 0, len, StandardCharsets.UTF_8));
Assert.assertEquals(5, len); }
Assert.assertEquals("World",new String(buffer,0,len,StandardCharsets.UTF_8)); }
client.close();
} }
@Test @Test
@ -317,30 +310,33 @@ public class SslConnectionTest
{ {
__sslCtxFactory.setRenegotiationAllowed(false); __sslCtxFactory.setRenegotiationAllowed(false);
SSLSocket client = newClient(); try (SSLSocket client = newClient())
client.setSoTimeout(60000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8));
byte[] buffer = new byte[1024];
int len=client.getInputStream().read(buffer);
Assert.assertEquals(5, len);
Assert.assertEquals("Hello",new String(buffer,0,len,StandardCharsets.UTF_8));
client.startHandshake();
client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8));
try
{ {
client.getInputStream().read(buffer); client.setSoTimeout(60000);
Assert.fail(); try (SocketChannel server = _connector.accept())
} {
catch(SSLException e) server.configureBlocking(false);
{ _manager.accept(server);
// expected
client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8));
byte[] buffer = new byte[1024];
int len = client.getInputStream().read(buffer);
Assert.assertEquals(5, len);
Assert.assertEquals("Hello", new String(buffer, 0, len, StandardCharsets.UTF_8));
client.startHandshake();
client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8));
try
{
client.getInputStream().read(buffer);
Assert.fail();
}
catch (SSLException e)
{
// expected
}
}
} }
} }
@ -350,152 +346,149 @@ public class SslConnectionTest
__sslCtxFactory.setRenegotiationAllowed(true); __sslCtxFactory.setRenegotiationAllowed(true);
__sslCtxFactory.setRenegotiationLimit(2); __sslCtxFactory.setRenegotiationLimit(2);
SSLSocket client = newClient(); try (SSLSocket client = newClient())
client.setSoTimeout(60000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
client.getOutputStream().write("Good".getBytes(StandardCharsets.UTF_8));
byte[] buffer = new byte[1024];
int len=client.getInputStream().read(buffer);
Assert.assertEquals(4, len);
Assert.assertEquals("Good",new String(buffer,0,len,StandardCharsets.UTF_8));
client.startHandshake();
client.getOutputStream().write("Bye".getBytes(StandardCharsets.UTF_8));
len=client.getInputStream().read(buffer);
Assert.assertEquals(3, len);
Assert.assertEquals("Bye",new String(buffer,0,len,StandardCharsets.UTF_8));
client.startHandshake();
client.getOutputStream().write("Cruel".getBytes(StandardCharsets.UTF_8));
len=client.getInputStream().read(buffer);
Assert.assertEquals(5, len);
Assert.assertEquals("Cruel",new String(buffer,0,len,StandardCharsets.UTF_8));
client.startHandshake();
client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8));
try
{ {
client.getInputStream().read(buffer); client.setSoTimeout(60000);
Assert.fail(); try (SocketChannel server = _connector.accept())
} {
catch(SSLException e) server.configureBlocking(false);
{ _manager.accept(server);
// expected
client.getOutputStream().write("Good".getBytes(StandardCharsets.UTF_8));
byte[] buffer = new byte[1024];
int len = client.getInputStream().read(buffer);
Assert.assertEquals(4, len);
Assert.assertEquals("Good", new String(buffer, 0, len, StandardCharsets.UTF_8));
client.startHandshake();
client.getOutputStream().write("Bye".getBytes(StandardCharsets.UTF_8));
len = client.getInputStream().read(buffer);
Assert.assertEquals(3, len);
Assert.assertEquals("Bye", new String(buffer, 0, len, StandardCharsets.UTF_8));
client.startHandshake();
client.getOutputStream().write("Cruel".getBytes(StandardCharsets.UTF_8));
len = client.getInputStream().read(buffer);
Assert.assertEquals(5, len);
Assert.assertEquals("Cruel", new String(buffer, 0, len, StandardCharsets.UTF_8));
client.startHandshake();
client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8));
try
{
client.getInputStream().read(buffer);
Assert.fail();
}
catch (SSLException e)
{
// expected
}
}
} }
} }
@Test @Test
public void testWriteOnConnect() throws Exception public void testWriteOnConnect() throws Exception
{ {
_testFill=false; _testFill=false;
_writeCallback = new FutureCallback(); _writeCallback = new FutureCallback();
Socket client = newClient();
client.setSoTimeout(10000);
SocketChannel server = _connector.accept(); try (Socket client = newClient())
server.configureBlocking(false); {
_manager.accept(server); client.setSoTimeout(10000);
try (SocketChannel server = _connector.accept())
{
server.configureBlocking(false);
_manager.accept(server);
byte[] buffer = new byte[1024]; byte[] buffer = new byte[1024];
int len=client.getInputStream().read(buffer); int len = client.getInputStream().read(buffer);
Assert.assertEquals("Hello Client",new String(buffer,0,len,StandardCharsets.UTF_8)); Assert.assertEquals("Hello Client", new String(buffer, 0, len, StandardCharsets.UTF_8));
Assert.assertEquals(null,_writeCallback.get(100,TimeUnit.MILLISECONDS)); Assert.assertNull(_writeCallback.get(100, TimeUnit.MILLISECONDS));
client.close(); }
}
} }
@Test @Test
public void testBlockedWrite() throws Exception public void testBlockedWrite() throws Exception
{ {
Socket client = newClient(); try (Socket client = newClient())
client.setSoTimeout(5000); {
client.setSoTimeout(5000);
try (SocketChannel server = _connector.accept())
{
server.configureBlocking(false);
_manager.accept(server);
SocketChannel server = _connector.accept(); __startBlocking.set(5);
server.configureBlocking(false); __blockFor.set(3);
_manager.accept(server);
__startBlocking.set(5); client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8));
__blockFor.set(3); byte[] buffer = new byte[1024];
int len = client.getInputStream().read(buffer);
Assert.assertEquals(5, len);
Assert.assertEquals("Hello", new String(buffer, 0, len, StandardCharsets.UTF_8));
client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); _dispatches.set(0);
byte[] buffer = new byte[1024]; client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8));
int len=client.getInputStream().read(buffer); len = 5;
Assert.assertEquals(5, len); while (len > 0)
Assert.assertEquals("Hello",new String(buffer,0,len,StandardCharsets.UTF_8)); len -= client.getInputStream().read(buffer);
Assert.assertEquals(0, len);
_dispatches.set(0); }
client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); }
len=5;
while(len>0)
len-=client.getInputStream().read(buffer);
Assert.assertEquals(0, len);
client.close();
} }
@Test @Test
public void testManyLines() throws Exception public void testManyLines() throws Exception
{ {
final Socket client = newClient(); try (Socket client = newClient())
client.setSoTimeout(10000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
final int LINES=20;
final CountDownLatch count=new CountDownLatch(LINES);
new Thread()
{ {
@Override client.setSoTimeout(10000);
public void run() try (SocketChannel server = _connector.accept())
{ {
try server.configureBlocking(false);
_manager.accept(server);
final int LINES = 20;
final CountDownLatch count = new CountDownLatch(LINES);
new Thread(() ->
{ {
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream(),StandardCharsets.UTF_8)); try
while(count.getCount()>0)
{ {
String line=in.readLine(); BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream(), StandardCharsets.UTF_8));
if (line==null) while (count.getCount() > 0)
break; {
// System.err.println(line); String line = in.readLine();
count.countDown(); if (line == null)
break;
// System.err.println(line);
count.countDown();
}
}
catch (IOException e)
{
e.printStackTrace();
}
}).start();
for (int i = 0; i < LINES; i++)
{
client.getOutputStream().write(("HelloWorld " + i + "\n").getBytes(StandardCharsets.UTF_8));
// System.err.println("wrote");
if (i % 1000 == 0)
{
client.getOutputStream().flush();
Thread.sleep(10);
} }
} }
catch(IOException e)
{
e.printStackTrace();
}
}
}.start();
for (int i=0;i<LINES;i++) Assert.assertTrue(count.await(20, TimeUnit.SECONDS));
{
client.getOutputStream().write(("HelloWorld "+i+"\n").getBytes(StandardCharsets.UTF_8));
// System.err.println("wrote");
if (i%1000==0)
{
client.getOutputStream().flush();
Thread.sleep(10);
} }
} }
Assert.assertTrue(count.await(20,TimeUnit.SECONDS));
client.close();
} }
} }