From 744f9054b390a8276d8f9d911da1571bf5764fe6 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 16 May 2018 18:40:10 +0200 Subject: [PATCH] Fixes #2547 - Review usages of ServerSocket[Channel].accept(). Signed-off-by: Simone Bordet --- .../jetty/client/HttpClientTLSTest.java | 62 +- .../client/ServerConnectionCloseTest.java | 103 +-- .../eclipse/jetty/client/Socks4ProxyTest.java | 116 ++-- .../client/TLSServerConnectionCloseTest.java | 146 ++-- .../jetty/client/ssl/SslBytesClientTest.java | 388 +++++------ .../jetty/http2/client/InvalidServerTest.java | 32 +- .../java/org/eclipse/jetty/io/IOTest.java | 648 +++++++++--------- .../java/org/eclipse/jetty/io/NIOTest.java | 171 +++-- .../SocketChannelEndPointInterestsTest.java | 52 +- .../jetty/io/SocketChannelEndPointTest.java | 637 +++++++++-------- .../eclipse/jetty/io/SslConnectionTest.java | 363 +++++----- 11 files changed, 1347 insertions(+), 1371 deletions(-) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java index cf2e355d50e..220b81b5daa 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java @@ -479,39 +479,41 @@ public class HttpClientTLSTest latch.countDown(); }); - Socket socket = server.accept(); - SSLSocket sslSocket = (SSLSocket)serverTLSFactory.getSslContext().getSocketFactory().createSocket(socket, null, socket.getPort(), true); - sslSocket.setUseClientMode(false); - BufferedReader reader = new BufferedReader(new InputStreamReader(sslSocket.getInputStream(), StandardCharsets.UTF_8)); - while (true) + try (Socket socket = server.accept()) { - String line = reader.readLine(); - if (line == null || line.isEmpty()) - break; + SSLSocket sslSocket = (SSLSocket)serverTLSFactory.getSslContext().getSocketFactory().createSocket(socket, null, socket.getPort(), true); + sslSocket.setUseClientMode(false); + BufferedReader reader = new BufferedReader(new InputStreamReader(sslSocket.getInputStream(), StandardCharsets.UTF_8)); + while (true) + { + String line = reader.readLine(); + if (line == null || line.isEmpty()) + break; + } + + // If the response is Content-Length delimited, allowing the + // missing TLS Close Message is fine because the application + // will see a EOFException anyway. + // If the response is connection delimited, allowing the + // missing TLS Close Message is bad because the application + // will see a successful response with truncated content. + + // Verify that by not allowing the missing + // TLS Close Message we get a response failure. + + byte[] half = new byte[8]; + String response = "HTTP/1.1 200 OK\r\n" + +// "Content-Length: " + (half.length * 2) + "\r\n" + + "Connection: close\r\n" + + "\r\n"; + OutputStream output = sslSocket.getOutputStream(); + output.write(response.getBytes(StandardCharsets.UTF_8)); + output.write(half); + output.flush(); + // Simulate a truncation attack by raw closing + // the socket in the try-with-resources block end. } - // If the response is Content-Length delimited, allowing the - // missing TLS Close Message is fine because the application - // will see a EOFException anyway. - // If the response is connection delimited, allowing the - // missing TLS Close Message is bad because the application - // will see a successful response with truncated content. - - // Verify that by not allowing the missing - // TLS Close Message we get a response failure. - - byte[] half = new byte[8]; - String response = "HTTP/1.1 200 OK\r\n" + -// "Content-Length: " + (half.length * 2) + "\r\n" + - "Connection: close\r\n" + - "\r\n"; - OutputStream output = sslSocket.getOutputStream(); - output.write(response.getBytes(StandardCharsets.UTF_8)); - output.write(half); - output.flush(); - // Simulate a truncation attack by raw closing. - socket.close(); - Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ServerConnectionCloseTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ServerConnectionCloseTest.java index 62536f246a8..5712d0cc6d2 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ServerConnectionCloseTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ServerConnectionCloseTest.java @@ -98,63 +98,66 @@ public class ServerConnectionCloseTest private void testServerSendsConnectionClose(boolean shutdownOutput, boolean chunked, String content) throws Exception { - ServerSocket server = new ServerSocket(0); - int port = server.getLocalPort(); - - startClient(); - - Request request = client.newRequest("localhost", port).path("/ctx/path"); - FutureResponseListener listener = new FutureResponseListener(request); - request.send(listener); - - Socket socket = server.accept(); - - InputStream input = socket.getInputStream(); - consumeRequest(input); - - OutputStream output = socket.getOutputStream(); - String serverResponse = "" + - "HTTP/1.1 200 OK\r\n" + - "Connection: close\r\n"; - if (chunked) + try (ServerSocket server = new ServerSocket(0)) { - serverResponse += "" + - "Transfer-Encoding: chunked\r\n" + - "\r\n"; + int port = server.getLocalPort(); + + startClient(); + + Request request = client.newRequest("localhost", port).path("/ctx/path"); + FutureResponseListener listener = new FutureResponseListener(request); + request.send(listener); + + try (Socket socket = server.accept()) + { + InputStream input = socket.getInputStream(); + consumeRequest(input); + + OutputStream output = socket.getOutputStream(); + String serverResponse = "" + + "HTTP/1.1 200 OK\r\n" + + "Connection: close\r\n"; + if (chunked) + { + serverResponse += "" + + "Transfer-Encoding: chunked\r\n" + + "\r\n"; for (int i = 0; i < 2; ++i) { serverResponse += Integer.toHexString(content.length()) + "\r\n" + - content + "\r\n"; + content + "\r\n"; } - serverResponse += "" + - "0\r\n" + - "\r\n"; + serverResponse += "" + + "0\r\n" + + "\r\n"; + } + else + { + serverResponse += "Content-Length: " + content.length() + "\r\n"; + serverResponse += "\r\n"; + serverResponse += content; + } + + output.write(serverResponse.getBytes("UTF-8")); + output.flush(); + if (shutdownOutput) + socket.shutdownOutput(); + + ContentResponse response = listener.get(5, TimeUnit.SECONDS); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + + // Give some time to process the connection. + Thread.sleep(1000); + + // Connection should have been removed from pool. + HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + Assert.assertEquals(0, connectionPool.getConnectionCount()); + Assert.assertEquals(0, connectionPool.getIdleConnectionCount()); + Assert.assertEquals(0, connectionPool.getActiveConnectionCount()); + } } - else - { - serverResponse += "Content-Length: " + content.length() + "\r\n"; - serverResponse += "\r\n"; - serverResponse += content; - } - - output.write(serverResponse.getBytes("UTF-8")); - output.flush(); - if (shutdownOutput) - socket.shutdownOutput(); - - ContentResponse response = listener.get(5, TimeUnit.SECONDS); - Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); - - // Give some time to process the connection. - Thread.sleep(1000); - - // Connection should have been removed from pool. - HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port); - DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - Assert.assertEquals(0, connectionPool.getConnectionCount()); - Assert.assertEquals(0, connectionPool.getIdleConnectionCount()); - Assert.assertEquals(0, connectionPool.getActiveConnectionCount()); } private boolean consumeRequest(InputStream input) throws IOException diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/Socks4ProxyTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/Socks4ProxyTest.java index b3f6deddf8b..754bd6aa97e 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/Socks4ProxyTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/Socks4ProxyTest.java @@ -79,41 +79,40 @@ public class Socks4ProxyTest latch.countDown(); }); - SocketChannel channel = server.accept(); + try (SocketChannel channel = server.accept()) + { + int socks4MessageLength = 9; + ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength); + int read = channel.read(buffer); + Assert.assertEquals(socks4MessageLength, read); + Assert.assertEquals(4, buffer.get(0) & 0xFF); + Assert.assertEquals(1, buffer.get(1) & 0xFF); + Assert.assertEquals(serverPort, buffer.getShort(2) & 0xFFFF); + Assert.assertEquals(ip1, buffer.get(4) & 0xFF); + Assert.assertEquals(ip2, buffer.get(5) & 0xFF); + Assert.assertEquals(ip3, buffer.get(6) & 0xFF); + Assert.assertEquals(ip4, buffer.get(7) & 0xFF); + Assert.assertEquals(0, buffer.get(8) & 0xFF); - int socks4MessageLength = 9; - ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength); - int read = channel.read(buffer); - Assert.assertEquals(socks4MessageLength, read); - Assert.assertEquals(4, buffer.get(0) & 0xFF); - Assert.assertEquals(1, buffer.get(1) & 0xFF); - Assert.assertEquals(serverPort, buffer.getShort(2) & 0xFFFF); - Assert.assertEquals(ip1, buffer.get(4) & 0xFF); - Assert.assertEquals(ip2, buffer.get(5) & 0xFF); - Assert.assertEquals(ip3, buffer.get(6) & 0xFF); - Assert.assertEquals(ip4, buffer.get(7) & 0xFF); - Assert.assertEquals(0, buffer.get(8) & 0xFF); + // Socks4 response. + channel.write(ByteBuffer.wrap(new byte[]{0, 0x5A, 0, 0, 0, 0, 0, 0})); - // Socks4 response. - channel.write(ByteBuffer.wrap(new byte[]{0, 0x5A, 0, 0, 0, 0, 0, 0})); + buffer = ByteBuffer.allocate(method.length() + 1 + path.length()); + read = channel.read(buffer); + Assert.assertEquals(buffer.capacity(), read); + buffer.flip(); + Assert.assertEquals(method + " " + path, StandardCharsets.UTF_8.decode(buffer).toString()); - buffer = ByteBuffer.allocate(method.length() + 1 + path.length()); - read = channel.read(buffer); - Assert.assertEquals(buffer.capacity(), read); - buffer.flip(); - Assert.assertEquals(method + " " + path, StandardCharsets.UTF_8.decode(buffer).toString()); + // Response + String response = "" + + "HTTP/1.1 200 OK\r\n" + + "Content-Length: 0\r\n" + + "Connection: close\r\n" + + "\r\n"; + channel.write(ByteBuffer.wrap(response.getBytes("UTF-8"))); - // Response - String response = "" + - "HTTP/1.1 200 OK\r\n" + - "Content-Length: 0\r\n" + - "Connection: close\r\n" + - "\r\n"; - channel.write(ByteBuffer.wrap(response.getBytes("UTF-8"))); - - Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); - - channel.close(); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } } @Test @@ -139,39 +138,38 @@ public class Socks4ProxyTest result.getFailure().printStackTrace(); }); - SocketChannel channel = server.accept(); + try (SocketChannel channel = server.accept()) + { + int socks4MessageLength = 9; + ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength); + int read = channel.read(buffer); + Assert.assertEquals(socks4MessageLength, read); - int socks4MessageLength = 9; - ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength); - int read = channel.read(buffer); - Assert.assertEquals(socks4MessageLength, read); + // Socks4 response, with split bytes. + byte[] chunk1 = new byte[]{0, 0x5A, 0}; + byte[] chunk2 = new byte[]{0, 0, 0, 0, 0}; + channel.write(ByteBuffer.wrap(chunk1)); - // Socks4 response, with split bytes. - byte[] chunk1 = new byte[]{0, 0x5A, 0}; - byte[] chunk2 = new byte[]{0, 0, 0, 0, 0}; - channel.write(ByteBuffer.wrap(chunk1)); + // Wait before sending the second chunk. + Thread.sleep(1000); - // Wait before sending the second chunk. - Thread.sleep(1000); + channel.write(ByteBuffer.wrap(chunk2)); - channel.write(ByteBuffer.wrap(chunk2)); + buffer = ByteBuffer.allocate(method.length()); + read = channel.read(buffer); + Assert.assertEquals(buffer.capacity(), read); + buffer.flip(); + Assert.assertEquals(method, StandardCharsets.UTF_8.decode(buffer).toString()); - buffer = ByteBuffer.allocate(method.length()); - read = channel.read(buffer); - Assert.assertEquals(buffer.capacity(), read); - buffer.flip(); - Assert.assertEquals(method, StandardCharsets.UTF_8.decode(buffer).toString()); + // Response + String response = "" + + "HTTP/1.1 200 OK\r\n" + + "Content-Length: 0\r\n" + + "Connection: close\r\n" + + "\r\n"; + channel.write(ByteBuffer.wrap(response.getBytes("UTF-8"))); - // Response - String response = "" + - "HTTP/1.1 200 OK\r\n" + - "Content-Length: 0\r\n" + - "Connection: close\r\n" + - "\r\n"; - channel.write(ByteBuffer.wrap(response.getBytes("UTF-8"))); - - Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); - - channel.close(); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/TLSServerConnectionCloseTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/TLSServerConnectionCloseTest.java index a5d87cb3c14..c6678d1c75f 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/TLSServerConnectionCloseTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/TLSServerConnectionCloseTest.java @@ -104,87 +104,91 @@ public class TLSServerConnectionCloseTest private void testServerSendsConnectionClose(boolean chunked, String content) throws Exception { - ServerSocket server = new ServerSocket(0); - int port = server.getLocalPort(); - - startClient(); - - Request request = client.newRequest("localhost", port).scheme("https").path("/ctx/path"); - FutureResponseListener listener = new FutureResponseListener(request); - request.send(listener); - - Socket socket = server.accept(); - SSLContext sslContext = client.getSslContextFactory().getSslContext(); - SSLSocket sslSocket = (SSLSocket)sslContext.getSocketFactory().createSocket(socket, "localhost", port, false); - sslSocket.setUseClientMode(false); - sslSocket.startHandshake(); - - InputStream input = sslSocket.getInputStream(); - consumeRequest(input); - - OutputStream output = sslSocket.getOutputStream(); - String serverResponse = "" + - "HTTP/1.1 200 OK\r\n" + - "Connection: close\r\n"; - if (chunked) + try (ServerSocket server = new ServerSocket(0)) { - serverResponse += "" + - "Transfer-Encoding: chunked\r\n" + - "\r\n"; + int port = server.getLocalPort(); + + startClient(); + + Request request = client.newRequest("localhost", port).scheme("https").path("/ctx/path"); + FutureResponseListener listener = new FutureResponseListener(request); + request.send(listener); + + try (Socket socket = server.accept()) + { + SSLContext sslContext = client.getSslContextFactory().getSslContext(); + SSLSocket sslSocket = (SSLSocket)sslContext.getSocketFactory().createSocket(socket, "localhost", port, false); + sslSocket.setUseClientMode(false); + sslSocket.startHandshake(); + + InputStream input = sslSocket.getInputStream(); + consumeRequest(input); + + OutputStream output = sslSocket.getOutputStream(); + String serverResponse = "" + + "HTTP/1.1 200 OK\r\n" + + "Connection: close\r\n"; + if (chunked) + { + serverResponse += "" + + "Transfer-Encoding: chunked\r\n" + + "\r\n"; for (int i = 0; i < 2; ++i) { serverResponse += Integer.toHexString(content.length()) + "\r\n" + - content + "\r\n"; + content + "\r\n"; } - serverResponse += "" + - "0\r\n" + - "\r\n"; - } - else - { - serverResponse += "Content-Length: " + content.length() + "\r\n"; - serverResponse += "\r\n"; - serverResponse += content; - } + serverResponse += "" + + "0\r\n" + + "\r\n"; + } + else + { + serverResponse += "Content-Length: " + content.length() + "\r\n"; + serverResponse += "\r\n"; + serverResponse += content; + } - output.write(serverResponse.getBytes("UTF-8")); - output.flush(); + output.write(serverResponse.getBytes("UTF-8")); + output.flush(); - switch (closeMode) - { - case NONE: - { - break; - } - case CLOSE: - { - sslSocket.close(); - break; - } - case ABRUPT: - { - socket.shutdownOutput(); - break; - } - default: - { - throw new IllegalStateException(); + switch (closeMode) + { + case NONE: + { + break; + } + case CLOSE: + { + sslSocket.close(); + break; + } + case ABRUPT: + { + socket.shutdownOutput(); + break; + } + default: + { + throw new IllegalStateException(); + } + } + + ContentResponse response = listener.get(5, TimeUnit.SECONDS); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + + // Give some time to process the connection. + Thread.sleep(1000); + + // Connection should have been removed from pool. + HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + Assert.assertEquals(0, connectionPool.getConnectionCount()); + Assert.assertEquals(0, connectionPool.getIdleConnectionCount()); + Assert.assertEquals(0, connectionPool.getActiveConnectionCount()); } } - - ContentResponse response = listener.get(5, TimeUnit.SECONDS); - Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); - - // Give some time to process the connection. - Thread.sleep(1000); - - // Connection should have been removed from pool. - HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port); - DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - Assert.assertEquals(0, connectionPool.getConnectionCount()); - Assert.assertEquals(0, connectionPool.getIdleConnectionCount()); - Assert.assertEquals(0, connectionPool.getActiveConnectionCount()); } private boolean consumeRequest(InputStream input) throws IOException diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesClientTest.java index 83684dd8397..64610fbfb7a 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesClientTest.java @@ -101,72 +101,72 @@ public class SslBytesClientTest extends SslBytesTest Assert.assertTrue(proxy.awaitClient(5, TimeUnit.SECONDS)); - final SSLSocket server = (SSLSocket)acceptor.accept(); - server.setUseClientMode(false); - - Future handshake = threadPool.submit(() -> + try (SSLSocket server = (SSLSocket)acceptor.accept()) { - server.startHandshake(); - return null; - }); + server.setUseClientMode(false); - // Client Hello - TLSRecord record = proxy.readFromClient(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToServer(record); + Future handshake = threadPool.submit(() -> + { + server.startHandshake(); + return null; + }); - // Server Hello + Certificate + Server Done - record = proxy.readFromServer(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToClient(record); + // Client Hello + TLSRecord record = proxy.readFromClient(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToServer(record); - // Client Key Exchange - record = proxy.readFromClient(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToServer(record); + // Server Hello + Certificate + Server Done + record = proxy.readFromServer(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToClient(record); - // Change Cipher Spec - record = proxy.readFromClient(); - Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); - proxy.flushToServer(record); + // Client Key Exchange + record = proxy.readFromClient(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToServer(record); - // Client Done - record = proxy.readFromClient(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToServer(record); + // Change Cipher Spec + record = proxy.readFromClient(); + Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); + proxy.flushToServer(record); - // Change Cipher Spec - record = proxy.readFromServer(); - Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); - proxy.flushToClient(record); + // Client Done + record = proxy.readFromClient(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToServer(record); - // Server Done - record = proxy.readFromServer(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToClient(record); + // Change Cipher Spec + record = proxy.readFromServer(); + Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); + proxy.flushToClient(record); - Assert.assertNull(handshake.get(5, TimeUnit.SECONDS)); + // Server Done + record = proxy.readFromServer(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToClient(record); - SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow(); - // Read request - BufferedReader reader = new BufferedReader(new InputStreamReader(server.getInputStream(), StandardCharsets.UTF_8)); - String line = reader.readLine(); - Assert.assertTrue(line.startsWith("GET")); - while (line.length() > 0) - line = reader.readLine(); + Assert.assertNull(handshake.get(5, TimeUnit.SECONDS)); - // Write response - OutputStream output = server.getOutputStream(); - output.write(("HTTP/1.1 200 OK\r\n" + - "Content-Length: 0\r\n" + - "\r\n").getBytes(StandardCharsets.UTF_8)); - output.flush(); - Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS)); + SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow(); + // Read request + BufferedReader reader = new BufferedReader(new InputStreamReader(server.getInputStream(), StandardCharsets.UTF_8)); + String line = reader.readLine(); + Assert.assertTrue(line.startsWith("GET")); + while (line.length() > 0) + line = reader.readLine(); - ContentResponse response = listener.get(5, TimeUnit.SECONDS); - Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + // Write response + OutputStream output = server.getOutputStream(); + output.write(("HTTP/1.1 200 OK\r\n" + + "Content-Length: 0\r\n" + + "\r\n").getBytes(StandardCharsets.UTF_8)); + output.flush(); + Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS)); - server.close(); + ContentResponse response = listener.get(5, TimeUnit.SECONDS); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + } } @Test @@ -178,109 +178,109 @@ public class SslBytesClientTest extends SslBytesTest Assert.assertTrue(proxy.awaitClient(5, TimeUnit.SECONDS)); - final SSLSocket server = (SSLSocket)acceptor.accept(); - server.setUseClientMode(false); - - Future handshake = threadPool.submit(() -> + try (SSLSocket server = (SSLSocket)acceptor.accept()) { - server.startHandshake(); - return null; - }); + server.setUseClientMode(false); - SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow(); - Assert.assertNull(handshake.get(5, TimeUnit.SECONDS)); + Future handshake = threadPool.submit(() -> + { + server.startHandshake(); + return null; + }); - // Read request - InputStream serverInput = server.getInputStream(); - BufferedReader reader = new BufferedReader(new InputStreamReader(serverInput, StandardCharsets.UTF_8)); - String line = reader.readLine(); - Assert.assertTrue(line.startsWith("GET")); - while (line.length() > 0) - line = reader.readLine(); + SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow(); + Assert.assertNull(handshake.get(5, TimeUnit.SECONDS)); - OutputStream serverOutput = server.getOutputStream(); - byte[] data1 = new byte[1024]; - Arrays.fill(data1, (byte)'X'); - String content1 = new String(data1, StandardCharsets.UTF_8); - byte[] data2 = new byte[1024]; - Arrays.fill(data2, (byte)'Y'); - final String content2 = new String(data2, StandardCharsets.UTF_8); - // Write first part of the response - serverOutput.write(("HTTP/1.1 200 OK\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + (content1.length() + content2.length()) + "\r\n" + - "\r\n" + - content1).getBytes(StandardCharsets.UTF_8)); - serverOutput.flush(); - Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS)); + // Read request + InputStream serverInput = server.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(serverInput, StandardCharsets.UTF_8)); + String line = reader.readLine(); + Assert.assertTrue(line.startsWith("GET")); + while (line.length() > 0) + line = reader.readLine(); - // Renegotiate - Future renegotiation = threadPool.submit(() -> - { - server.startHandshake(); - return null; - }); + OutputStream serverOutput = server.getOutputStream(); + byte[] data1 = new byte[1024]; + Arrays.fill(data1, (byte)'X'); + String content1 = new String(data1, StandardCharsets.UTF_8); + byte[] data2 = new byte[1024]; + Arrays.fill(data2, (byte)'Y'); + final String content2 = new String(data2, StandardCharsets.UTF_8); + // Write first part of the response + serverOutput.write(("HTTP/1.1 200 OK\r\n" + + "Content-Type: text/plain\r\n" + + "Content-Length: " + (content1.length() + content2.length()) + "\r\n" + + "\r\n" + + content1).getBytes(StandardCharsets.UTF_8)); + serverOutput.flush(); + Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS)); - // Renegotiation Handshake - TLSRecord record = proxy.readFromServer(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToClient(record); + // Renegotiate + Future renegotiation = threadPool.submit(() -> + { + server.startHandshake(); + return null; + }); - // Renegotiation Handshake - record = proxy.readFromClient(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToServer(record); + // Renegotiation Handshake + TLSRecord record = proxy.readFromServer(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToClient(record); - // Trigger a read to have the server write the final renegotiation steps - server.setSoTimeout(100); - try - { - serverInput.read(); - Assert.fail(); + // Renegotiation Handshake + record = proxy.readFromClient(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToServer(record); + + // Trigger a read to have the server write the final renegotiation steps + server.setSoTimeout(100); + try + { + serverInput.read(); + Assert.fail(); + } + catch (SocketTimeoutException x) + { + // Expected + } + + // Renegotiation Handshake + record = proxy.readFromServer(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToClient(record); + + // Renegotiation Change Cipher + record = proxy.readFromServer(); + Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); + proxy.flushToClient(record); + + // Renegotiation Handshake + record = proxy.readFromServer(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToClient(record); + + // Renegotiation Change Cipher + record = proxy.readFromClient(); + Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); + proxy.flushToServer(record); + + // Renegotiation Handshake + record = proxy.readFromClient(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToServer(record); + + Assert.assertNull(renegotiation.get(5, TimeUnit.SECONDS)); + + // Complete the response + automaticProxyFlow = proxy.startAutomaticFlow(); + serverOutput.write(data2); + serverOutput.flush(); + Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS)); + + ContentResponse response = listener.get(5, TimeUnit.SECONDS); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + Assert.assertEquals(data1.length + data2.length, response.getContent().length); } - catch (SocketTimeoutException x) - { - // Expected - } - - // Renegotiation Handshake - record = proxy.readFromServer(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToClient(record); - - // Renegotiation Change Cipher - record = proxy.readFromServer(); - Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); - proxy.flushToClient(record); - - // Renegotiation Handshake - record = proxy.readFromServer(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToClient(record); - - // Renegotiation Change Cipher - record = proxy.readFromClient(); - Assert.assertEquals(TLSRecord.Type.CHANGE_CIPHER_SPEC, record.getType()); - proxy.flushToServer(record); - - // Renegotiation Handshake - record = proxy.readFromClient(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToServer(record); - - Assert.assertNull(renegotiation.get(5, TimeUnit.SECONDS)); - - // Complete the response - automaticProxyFlow = proxy.startAutomaticFlow(); - serverOutput.write(data2); - serverOutput.flush(); - Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS)); - - ContentResponse response = listener.get(5, TimeUnit.SECONDS); - Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); - Assert.assertEquals(data1.length + data2.length, response.getContent().length); - - server.close(); } @Test @@ -294,60 +294,60 @@ public class SslBytesClientTest extends SslBytesTest Assert.assertTrue(proxy.awaitClient(5, TimeUnit.SECONDS)); - final SSLSocket server = (SSLSocket)acceptor.accept(); - server.setUseClientMode(false); - - Future handshake = threadPool.submit(() -> + try (SSLSocket server = (SSLSocket)acceptor.accept()) { - server.startHandshake(); - return null; - }); + server.setUseClientMode(false); - SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow(); - Assert.assertNull(handshake.get(5, TimeUnit.SECONDS)); + Future handshake = threadPool.submit(() -> + { + server.startHandshake(); + return null; + }); - // Read request - InputStream serverInput = server.getInputStream(); - BufferedReader reader = new BufferedReader(new InputStreamReader(serverInput, StandardCharsets.UTF_8)); - String line = reader.readLine(); - Assert.assertTrue(line.startsWith("GET")); - while (line.length() > 0) - line = reader.readLine(); + SimpleProxy.AutomaticFlow automaticProxyFlow = proxy.startAutomaticFlow(); + Assert.assertNull(handshake.get(5, TimeUnit.SECONDS)); - OutputStream serverOutput = server.getOutputStream(); - byte[] data1 = new byte[1024]; - Arrays.fill(data1, (byte)'X'); - String content1 = new String(data1, StandardCharsets.UTF_8); - byte[] data2 = new byte[1024]; - Arrays.fill(data2, (byte)'Y'); - final String content2 = new String(data2, StandardCharsets.UTF_8); - // Write first part of the response - serverOutput.write(("HTTP/1.1 200 OK\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + (content1.length() + content2.length()) + "\r\n" + - "\r\n" + - content1).getBytes(StandardCharsets.UTF_8)); - serverOutput.flush(); - Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS)); + // Read request + InputStream serverInput = server.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(serverInput, StandardCharsets.UTF_8)); + String line = reader.readLine(); + Assert.assertTrue(line.startsWith("GET")); + while (line.length() > 0) + line = reader.readLine(); - // Renegotiate - threadPool.submit(() -> - { - server.startHandshake(); - return null; - }); + OutputStream serverOutput = server.getOutputStream(); + byte[] data1 = new byte[1024]; + Arrays.fill(data1, (byte)'X'); + String content1 = new String(data1, StandardCharsets.UTF_8); + byte[] data2 = new byte[1024]; + Arrays.fill(data2, (byte)'Y'); + final String content2 = new String(data2, StandardCharsets.UTF_8); + // Write first part of the response + serverOutput.write(("HTTP/1.1 200 OK\r\n" + + "Content-Type: text/plain\r\n" + + "Content-Length: " + (content1.length() + content2.length()) + "\r\n" + + "\r\n" + + content1).getBytes(StandardCharsets.UTF_8)); + serverOutput.flush(); + Assert.assertTrue(automaticProxyFlow.stop(5, TimeUnit.SECONDS)); - // Renegotiation Handshake - TLSRecord record = proxy.readFromServer(); - Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); - proxy.flushToClient(record); + // Renegotiate + threadPool.submit(() -> + { + server.startHandshake(); + return null; + }); - // Client sends close alert. - record = proxy.readFromClient(); - Assert.assertEquals(TLSRecord.Type.ALERT, record.getType()); - record = proxy.readFromClient(); - Assert.assertNull(record); + // Renegotiation Handshake + TLSRecord record = proxy.readFromServer(); + Assert.assertEquals(TLSRecord.Type.HANDSHAKE, record.getType()); + proxy.flushToClient(record); - server.close(); + // Client sends close alert. + record = proxy.readFromClient(); + Assert.assertEquals(TLSRecord.Type.ALERT, record.getType()); + record = proxy.readFromClient(); + Assert.assertNull(record); + } } } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InvalidServerTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InvalidServerTest.java index 7be8f698943..a56cb15c924 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InvalidServerTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InvalidServerTest.java @@ -54,22 +54,24 @@ public class InvalidServerTest extends AbstractTest } }, promise); - Socket socket = server.accept(); - OutputStream output = socket.getOutputStream(); - output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8)); - - Session session = promise.get(5, TimeUnit.SECONDS); - Assert.assertNotNull(session); - - Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); - - // Verify that the client closed the socket. - InputStream input = socket.getInputStream(); - while (true) + try (Socket socket = server.accept()) { - int read = input.read(); - if (read < 0) - break; + OutputStream output = socket.getOutputStream(); + output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8)); + + Session session = promise.get(5, TimeUnit.SECONDS); + Assert.assertNotNull(session); + + Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); + + // Verify that the client closed the socket. + InputStream input = socket.getInputStream(); + while (true) + { + int read = input.read(); + if (read < 0) + break; + } } } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java index 21e96e4d3aa..9e055097d91 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java @@ -18,14 +18,6 @@ package org.eclipse.jetty.io; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -52,10 +44,17 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IO; -import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class IOTest { @Test @@ -73,325 +72,312 @@ public class IOTest @Test public void testHalfClose() throws Exception { - ServerSocket connector = new ServerSocket(0); - - Socket client = new Socket("localhost", connector.getLocalPort()); - Socket server = connector.accept(); - - // we can write both ways - client.getOutputStream().write(1); - assertEquals(1, server.getInputStream().read()); - server.getOutputStream().write(1); - assertEquals(1, client.getInputStream().read()); - - // shutdown output results in read -1 - client.shutdownOutput(); - assertEquals(-1, server.getInputStream().read()); - - // Even though EOF has been read, the server input is not seen as shutdown - assertFalse(server.isInputShutdown()); - - // and we can read -1 again - assertEquals(-1, server.getInputStream().read()); - - // but cannot write - try + try (ServerSocket connector = new ServerSocket(0); + Socket client = new Socket("localhost", connector.getLocalPort()); + Socket server = connector.accept()) { + // we can write both ways client.getOutputStream().write(1); - fail("exception expected"); - } - catch (SocketException e) - { - } - - // but can still write in opposite direction. - server.getOutputStream().write(1); - assertEquals(1, client.getInputStream().read()); - - // server can shutdown input to match the shutdown out of client - server.shutdownInput(); - - // now we EOF instead of reading -1 - try - { - server.getInputStream().read(); - fail("exception expected"); - } - catch (SocketException e) - { - } - - // but can still write in opposite direction. - server.getOutputStream().write(1); - assertEquals(1, client.getInputStream().read()); - - // client can shutdown input - client.shutdownInput(); - - // now we EOF instead of reading -1 - try - { - client.getInputStream().read(); - fail("exception expected"); - } - catch (SocketException e) - { - } - - // But we can still write at the server (data which will never be read) - server.getOutputStream().write(1); - - // and the server output is not shutdown - assertFalse(server.isOutputShutdown()); - - // until we explictly shut it down - server.shutdownOutput(); - - // and now we can't write - try - { + assertEquals(1, server.getInputStream().read()); server.getOutputStream().write(1); - fail("exception expected"); + assertEquals(1, client.getInputStream().read()); + + // shutdown output results in read -1 + client.shutdownOutput(); + assertEquals(-1, server.getInputStream().read()); + + // Even though EOF has been read, the server input is not seen as shutdown + assertFalse(server.isInputShutdown()); + + // and we can read -1 again + assertEquals(-1, server.getInputStream().read()); + + // but cannot write + try + { + client.getOutputStream().write(1); + fail("exception expected"); + } + catch (SocketException expected) + { + } + + // but can still write in opposite direction. + server.getOutputStream().write(1); + assertEquals(1, client.getInputStream().read()); + + // server can shutdown input to match the shutdown out of client + server.shutdownInput(); + + // now we EOF instead of reading -1 + try + { + server.getInputStream().read(); + fail("exception expected"); + } + catch (SocketException expected) + { + } + + // but can still write in opposite direction. + server.getOutputStream().write(1); + assertEquals(1, client.getInputStream().read()); + + // client can shutdown input + client.shutdownInput(); + + // now we EOF instead of reading -1 + try + { + client.getInputStream().read(); + fail("exception expected"); + } + catch (SocketException expected) + { + } + + // But we can still write at the server (data which will never be read) + server.getOutputStream().write(1); + + // and the server output is not shutdown + assertFalse(server.isOutputShutdown()); + + // until we explictly shut it down + server.shutdownOutput(); + + // and now we can't write + try + { + server.getOutputStream().write(1); + fail("exception expected"); + } + catch (SocketException expected) + { + } + + // but the sockets are still open + assertFalse(client.isClosed()); + assertFalse(server.isClosed()); + + // but if we close one end + client.close(); + + // it is seen as closed. + assertTrue(client.isClosed()); + + // but not the other end + assertFalse(server.isClosed()); + + // which has to be closed explictly + server.close(); + assertTrue(server.isClosed()); } - catch (SocketException e) - { - } - - // but the sockets are still open - assertFalse(client.isClosed()); - assertFalse(server.isClosed()); - - // but if we close one end - client.close(); - - // it is seen as closed. - assertTrue(client.isClosed()); - - // but not the other end - assertFalse(server.isClosed()); - - // which has to be closed explictly - server.close(); - assertTrue(server.isClosed()); } @Test public void testHalfCloseClientServer() throws Exception { - ServerSocketChannel connector = ServerSocketChannel.open(); - connector.socket().bind(null); - - Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket(); - client.setSoTimeout(1000); - client.setSoLinger(false, -1); - Socket server = connector.accept().socket(); - server.setSoTimeout(1000); - server.setSoLinger(false, -1); - - // Write from client to server - client.getOutputStream().write(1); - - // Server reads - assertEquals(1, server.getInputStream().read()); - - // Write from server to client with oshut - server.getOutputStream().write(1); - // System.err.println("OSHUT "+server); - server.shutdownOutput(); - - // Client reads response - assertEquals(1, client.getInputStream().read()); - - try + try (ServerSocketChannel connector = ServerSocketChannel.open()) { - // Client reads -1 and does ishut - assertEquals(-1, client.getInputStream().read()); - assertFalse(client.isInputShutdown()); - //System.err.println("ISHUT "+client); - client.shutdownInput(); - - // Client ??? - //System.err.println("OSHUT "+client); - client.shutdownOutput(); - //System.err.println("CLOSE "+client); - client.close(); - - // Server reads -1, does ishut and then close - assertEquals(-1, server.getInputStream().read()); - assertFalse(server.isInputShutdown()); - //System.err.println("ISHUT "+server); - - try + connector.socket().bind(null); + try (Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket()) { - server.shutdownInput(); - } - catch (SocketException e) - { - // System.err.println(e); - } - //System.err.println("CLOSE "+server); - server.close(); + client.setSoTimeout(1000); + client.setSoLinger(false, -1); + try (Socket server = connector.accept().socket()) + { + server.setSoTimeout(1000); + server.setSoLinger(false, -1); - } - catch (Exception e) - { - System.err.println(e); - assertTrue(OS.IS_OSX); + // Write from client to server + client.getOutputStream().write(1); + + // Server reads + assertEquals(1, server.getInputStream().read()); + + // Write from server to client with oshut + server.getOutputStream().write(1); + // System.err.println("OSHUT "+server); + server.shutdownOutput(); + + // Client reads response + assertEquals(1, client.getInputStream().read()); + + try + { + // Client reads -1 and does ishut + assertEquals(-1, client.getInputStream().read()); + assertFalse(client.isInputShutdown()); + //System.err.println("ISHUT "+client); + client.shutdownInput(); + + // Client ??? + //System.err.println("OSHUT "+client); + client.shutdownOutput(); + //System.err.println("CLOSE "+client); + client.close(); + + // Server reads -1, does ishut and then close + assertEquals(-1, server.getInputStream().read()); + assertFalse(server.isInputShutdown()); + //System.err.println("ISHUT "+server); + + server.shutdownInput(); + server.close(); + } + catch (Exception e) + { + e.printStackTrace(); + assertTrue(OS.IS_OSX); + } + } + } } } @Test public void testHalfCloseBadClient() throws Exception { - ServerSocketChannel connector = ServerSocketChannel.open(); - connector.socket().bind(null); - - Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket(); - client.setSoTimeout(1000); - client.setSoLinger(false, -1); - Socket server = connector.accept().socket(); - server.setSoTimeout(1000); - server.setSoLinger(false, -1); - - // Write from client to server - client.getOutputStream().write(1); - - // Server reads - assertEquals(1, server.getInputStream().read()); - - // Write from server to client with oshut - server.getOutputStream().write(1); - //System.err.println("OSHUT "+server); - server.shutdownOutput(); - - try + try (ServerSocketChannel connector = ServerSocketChannel.open()) { - // Client reads response - assertEquals(1, client.getInputStream().read()); + connector.socket().bind(null); - // Client reads -1 - assertEquals(-1, client.getInputStream().read()); - assertFalse(client.isInputShutdown()); - - // Client can still write as we are half closed - client.getOutputStream().write(1); - - // Server can still read - assertEquals(1, server.getInputStream().read()); - - // Server now closes - server.close(); - - // Client still reads -1 (not broken pipe !!) - assertEquals(-1, client.getInputStream().read()); - assertFalse(client.isInputShutdown()); - - Thread.sleep(100); - - // Client still reads -1 (not broken pipe !!) - assertEquals(-1, client.getInputStream().read()); - assertFalse(client.isInputShutdown()); - - // Client can still write data even though server is closed??? - client.getOutputStream().write(1); - - // Client eventually sees Broken Pipe - int i = 0; - try + try (Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket()) { - for (i = 0; i < 100000; i++) + client.setSoTimeout(1000); + client.setSoLinger(false, -1); + try (Socket server = connector.accept().socket()) + { + server.setSoTimeout(1000); + server.setSoLinger(false, -1); + + // Write from client to server client.getOutputStream().write(1); - Assert.fail(); - } - catch (IOException e) - { - } - client.close(); + // Server reads + assertEquals(1, server.getInputStream().read()); - } - catch (Exception e) - { - System.err.println("PLEASE INVESTIGATE:"); - e.printStackTrace(); + // Write from server to client with oshut + server.getOutputStream().write(1); + //System.err.println("OSHUT "+server); + server.shutdownOutput(); + + // Client reads response + assertEquals(1, client.getInputStream().read()); + + // Client reads -1 + assertEquals(-1, client.getInputStream().read()); + assertFalse(client.isInputShutdown()); + + // Client can still write as we are half closed + client.getOutputStream().write(1); + + // Server can still read + assertEquals(1, server.getInputStream().read()); + + // Server now closes + server.close(); + + // Client still reads -1 (not broken pipe !!) + assertEquals(-1, client.getInputStream().read()); + assertFalse(client.isInputShutdown()); + + Thread.sleep(100); + + // Client still reads -1 (not broken pipe !!) + assertEquals(-1, client.getInputStream().read()); + assertFalse(client.isInputShutdown()); + + // Client can still write data even though server is closed??? + client.getOutputStream().write(1); + + // Client eventually sees Broken Pipe + try + { + for (int i = 0; i < 100000; i++) + client.getOutputStream().write(1); + Assert.fail(); + } + catch (IOException expected) + { + } + } + } } } @Test public void testServerChannelInterrupt() throws Exception { - final ServerSocketChannel connector = ServerSocketChannel.open(); - connector.configureBlocking(true); - connector.socket().bind(null); - - Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket(); - client.setSoTimeout(2000); - client.setSoLinger(false, -1); - Socket server = connector.accept().socket(); - server.setSoTimeout(2000); - server.setSoLinger(false, -1); - - // Write from client to server - client.getOutputStream().write(1); - // Server reads - assertEquals(1, server.getInputStream().read()); - - // Write from server to client - server.getOutputStream().write(1); - // Client reads - assertEquals(1, client.getInputStream().read()); - - - // block a thread in accept - final CountDownLatch alatch=new CountDownLatch(2); - Thread acceptor = new Thread() + try (ServerSocketChannel connector = ServerSocketChannel.open()) { - @Override - public void run() + connector.configureBlocking(true); + connector.socket().bind(null); + try (Socket client = SocketChannel.open(connector.socket().getLocalSocketAddress()).socket()) { - try + client.setSoTimeout(2000); + client.setSoLinger(false, -1); + try (Socket server = connector.accept().socket()) { - alatch.countDown(); - connector.accept(); - } - catch (Throwable e) - { - } - finally - { - alatch.countDown(); + server.setSoTimeout(2000); + server.setSoLinger(false, -1); + + // Write from client to server + client.getOutputStream().write(1); + // Server reads + assertEquals(1, server.getInputStream().read()); + + // Write from server to client + server.getOutputStream().write(1); + // Client reads + assertEquals(1, client.getInputStream().read()); + + // block a thread in accept + final CountDownLatch latch = new CountDownLatch(2); + Thread acceptor = new Thread(() -> + { + try + { + latch.countDown(); + connector.accept(); + } + catch (Throwable ignored) + { + } + finally + { + latch.countDown(); + } + }); + acceptor.start(); + while (latch.getCount() == 2) + Thread.sleep(10); + + // interrupt the acceptor + acceptor.interrupt(); + + // wait for acceptor to exit + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + // connector is closed + assertFalse(connector.isOpen()); + + // but connection is still open + assertFalse(client.isClosed()); + assertFalse(server.isClosed()); + + // Write from client to server + client.getOutputStream().write(42); + // Server reads + assertEquals(42, server.getInputStream().read()); + + // Write from server to client + server.getOutputStream().write(43); + // Client reads + assertEquals(43, client.getInputStream().read()); } } - }; - acceptor.start(); - while (alatch.getCount()==2) - Thread.sleep(10); - - // interrupt the acceptor - acceptor.interrupt(); - - // wait for acceptor to exit - assertTrue(alatch.await(10,TimeUnit.SECONDS)); - - // connector is closed - assertFalse(connector.isOpen()); - - // but connection is still open - assertFalse(client.isClosed()); - assertFalse(server.isClosed()); - - // Write from client to server - client.getOutputStream().write(42); - // Server reads - assertEquals(42, server.getInputStream().read()); - - // Write from server to client - server.getOutputStream().write(43); - // Client reads - assertEquals(43, client.getInputStream().read()); - - client.close(); - + } } @@ -401,7 +387,7 @@ public class IOTest { try (ServerSocket connector = new ServerSocket(0); Socket client = new Socket("127.0.0.1", connector.getLocalPort()); - Socket server = connector.accept();) + Socket server = connector.accept()) { client.setTcpNoDelay(true); client.setSoLinger(true, 0); @@ -430,8 +416,8 @@ public class IOTest assertEquals(-1, server.getInputStream().read()); server.shutdownInput(); - // Since output was already shutdown, server closes - server.close(); + // Since output was already shutdown, server + // closes in the try-with-resources block end. } } @@ -490,52 +476,48 @@ public class IOTest assertEquals(expected,wrote); - for (int i=0;i { - Thread.sleep(100); - selector.wakeup(); - } - catch(Exception e) - { - e.printStackTrace(); - } + try + { + Thread.sleep(100); + selector.wakeup(); + } + catch (Exception e) + { + e.printStackTrace(); + } + }).start(); + assertThat(selector.select(), is(0)); } - - }.start(); - assertThat(selector.select(), is(0)); + } } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java index 11ac5f4a018..41fa75b364f 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/NIOTest.java @@ -18,10 +18,6 @@ package org.eclipse.jetty.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; @@ -31,106 +27,105 @@ import java.nio.channels.SocketChannel; import org.junit.Test; -/** - * - */ +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class NIOTest { @Test public void testSelector() throws Exception { - ServerSocket acceptor = new ServerSocket(0); + try (ServerSocket acceptor = new ServerSocket(0); + Selector selector = Selector.open(); + SocketChannel client = SocketChannel.open(acceptor.getLocalSocketAddress()); + Socket server = acceptor.accept()) + { + server.setTcpNoDelay(true); - Selector selector = Selector.open(); + // Make the client non blocking and register it with selector for reads + client.configureBlocking(false); + SelectionKey key = client.register(selector, SelectionKey.OP_READ); - // Create client server socket pair - SocketChannel client = SocketChannel.open(acceptor.getLocalSocketAddress()); - Socket server = acceptor.accept(); - server.setTcpNoDelay(true); + // assert it is not selected + assertTrue(key.isValid()); + assertFalse(key.isReadable()); + assertEquals(0, key.readyOps()); - // Make the client non blocking and register it with selector for reads - client.configureBlocking(false); - SelectionKey key = client.register(selector,SelectionKey.OP_READ); + // try selecting and assert nothing selected + int selected = selector.selectNow(); + assertEquals(0, selected); + assertEquals(0, selector.selectedKeys().size()); + assertTrue(key.isValid()); + assertFalse(key.isReadable()); + assertEquals(0, key.readyOps()); - // assert it is not selected - assertTrue(key.isValid()); - assertFalse(key.isReadable()); - assertEquals(0,key.readyOps()); + // Write a byte from server to client + server.getOutputStream().write(42); + server.getOutputStream().flush(); - // try selecting and assert nothing selected - int selected = selector.selectNow(); - assertEquals(0,selected); - assertEquals(0,selector.selectedKeys().size()); - assertTrue(key.isValid()); - assertFalse(key.isReadable()); - assertEquals(0,key.readyOps()); + // select again and assert selection found for read + selected = selector.select(1000); + assertEquals(1, selected); + assertEquals(1, selector.selectedKeys().size()); + assertTrue(key.isValid()); + assertTrue(key.isReadable()); + assertEquals(1, key.readyOps()); - // Write a byte from server to client - server.getOutputStream().write(42); - server.getOutputStream().flush(); + // select again and see that it is not reselect, but stays selected + selected = selector.select(100); + assertEquals(0, selected); + assertEquals(1, selector.selectedKeys().size()); + assertTrue(key.isValid()); + assertTrue(key.isReadable()); + assertEquals(1, key.readyOps()); - // select again and assert selection found for read - selected = selector.select(1000); - assertEquals(1,selected); - assertEquals(1,selector.selectedKeys().size()); - assertTrue(key.isValid()); - assertTrue(key.isReadable()); - assertEquals(1,key.readyOps()); + // read the byte + ByteBuffer buf = ByteBuffer.allocate(1024); + int len = client.read(buf); + assertEquals(1, len); + buf.flip(); + assertEquals(42, buf.get()); + buf.clear(); - // select again and see that it is not reselect, but stays selected - selected = selector.select(100); - assertEquals(0,selected); - assertEquals(1,selector.selectedKeys().size()); - assertTrue(key.isValid()); - assertTrue(key.isReadable()); - assertEquals(1,key.readyOps()); + // But this does not change the key + assertTrue(key.isValid()); + assertTrue(key.isReadable()); + assertEquals(1, key.readyOps()); - // read the byte - ByteBuffer buf = ByteBuffer.allocate(1024); - int len=client.read(buf); - assertEquals(1,len); - buf.flip(); - assertEquals(42,buf.get()); - buf.clear(); + // Even if we select again ? + selected = selector.select(100); + assertEquals(0, selected); + assertEquals(1, selector.selectedKeys().size()); + assertTrue(key.isValid()); + assertTrue(key.isReadable()); + assertEquals(1, key.readyOps()); - // But this does not change the key - assertTrue(key.isValid()); - assertTrue(key.isReadable()); - assertEquals(1,key.readyOps()); + // Unless we remove the key from the select set + // and then it is still flagged as isReadable() + selector.selectedKeys().clear(); + assertEquals(0, selector.selectedKeys().size()); + assertTrue(key.isValid()); + assertTrue(key.isReadable()); + assertEquals(1, key.readyOps()); - // Even if we select again ? - selected = selector.select(100); - assertEquals(0,selected); - assertEquals(1,selector.selectedKeys().size()); - assertTrue(key.isValid()); - assertTrue(key.isReadable()); - assertEquals(1,key.readyOps()); + // Now if we select again - it is still flagged as readable!!! + selected = selector.select(100); + assertEquals(0, selected); + assertEquals(0, selector.selectedKeys().size()); + assertTrue(key.isValid()); + assertTrue(key.isReadable()); + assertEquals(1, key.readyOps()); - // Unless we remove the key from the select set - // and then it is still flagged as isReadable() - selector.selectedKeys().clear(); - assertEquals(0,selector.selectedKeys().size()); - assertTrue(key.isValid()); - assertTrue(key.isReadable()); - assertEquals(1,key.readyOps()); - - // Now if we select again - it is still flagged as readable!!! - selected = selector.select(100); - assertEquals(0,selected); - assertEquals(0,selector.selectedKeys().size()); - assertTrue(key.isValid()); - assertTrue(key.isReadable()); - assertEquals(1,key.readyOps()); - - // Only when it is selected for something else does that state change. - key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE); - selected = selector.select(1000); - assertEquals(1,selected); - assertEquals(1,selector.selectedKeys().size()); - assertTrue(key.isValid()); - assertTrue(key.isWritable()); - assertFalse(key.isReadable()); - assertEquals(SelectionKey.OP_WRITE,key.readyOps()); + // Only when it is selected for something else does that state change. + key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); + selected = selector.select(1000); + assertEquals(1, selected); + assertEquals(1, selector.selectedKeys().size()); + assertTrue(key.isValid()); + assertTrue(key.isWritable()); + assertFalse(key.isReadable()); + assertEquals(SelectionKey.OP_WRITE, key.readyOps()); + } } - } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointInterestsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointInterestsTest.java index 04e17c9e4a0..f6503d559e4 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointInterestsTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointInterestsTest.java @@ -65,7 +65,7 @@ public class SocketChannelEndPointInterestsTest { @Override - protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) { SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()) { @@ -176,36 +176,37 @@ public class SocketChannelEndPointInterestsTest } }); - Socket client = new Socket(); - client.connect(connector.getLocalAddress()); - client.setSoTimeout(5000); + try (Socket client = new Socket()) + { + client.connect(connector.getLocalAddress()); + client.setSoTimeout(5000); + try (SocketChannel server = connector.accept()) + { + server.configureBlocking(false); + selectorManager.accept(server); - SocketChannel server = connector.accept(); - server.configureBlocking(false); - selectorManager.accept(server); + OutputStream clientOutput = client.getOutputStream(); + clientOutput.write(1); + clientOutput.flush(); + Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS)); - OutputStream clientOutput = client.getOutputStream(); - clientOutput.write(1); - clientOutput.flush(); - Assert.assertTrue(latch1.await(5, TimeUnit.SECONDS)); + // We do not read to keep the socket write blocked - // We do not read to keep the socket write blocked + clientOutput.write(2); + clientOutput.flush(); + Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS)); - clientOutput.write(2); - clientOutput.flush(); - Assert.assertTrue(latch2.await(5, TimeUnit.SECONDS)); + // Sleep before reading to allow waking up the server only for read + Thread.sleep(1000); - // Sleep before reading to allow waking up the server only for read - Thread.sleep(1000); + // Now read what was written, waking up the server for write + InputStream clientInput = client.getInputStream(); + while (size.getAndDecrement() > 0) + clientInput.read(); - // Now read what was written, waking up the server for write - InputStream clientInput = client.getInputStream(); - while (size.getAndDecrement() > 0) - clientInput.read(); - - client.close(); - - Assert.assertNull(failure.get()); + Assert.assertNull(failure.get()); + } + } } private interface Interested @@ -214,5 +215,4 @@ public class SocketChannelEndPointInterestsTest void onIncompleteFlush(); } - } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java index 017104b7954..0645430f1a4 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java @@ -18,13 +18,6 @@ package org.eclipse.jetty.io; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; @@ -73,6 +66,13 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + @SuppressWarnings("Duplicates") @RunWith(Parameterized.class) public class SocketChannelEndPointTest @@ -143,60 +143,61 @@ public class SocketChannelEndPointTest @Test public void testEcho() throws Exception { - Socket client = _scenario.newClient(_connector); - - client.setSoTimeout(60000); - - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - - _manager.accept(server); - - // Write client to server - client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); - - // Verify echo server to client - for (char c : "HelloWorld".toCharArray()) + try (Socket client = _scenario.newClient(_connector)) { - int b = client.getInputStream().read(); - assertTrue(b > 0); - assertEquals(c, (char) b); - } + client.setSoTimeout(60000); + try (SocketChannel server = _connector.accept()) + { + server.configureBlocking(false); + _manager.accept(server); - // wait for read timeout - client.setSoTimeout(500); - long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - try - { - client.getInputStream().read(); - Assert.fail(); - } - catch (SocketTimeoutException e) - { - long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start; - Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L)); - } + // Write client to server + client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); - // write then shutdown - client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8)); + // Verify echo server to client + for (char c : "HelloWorld".toCharArray()) + { + int b = client.getInputStream().read(); + assertTrue(b > 0); + assertEquals(c, (char)b); + } - // Verify echo server to client - for (char c : "Goodbye Cruel TLS".toCharArray()) - { - int b = client.getInputStream().read(); - Assert.assertThat("expect valid char integer", b, greaterThan(0)); - assertEquals("expect characters to be same", c, (char) b); - } - client.close(); + // wait for read timeout + client.setSoTimeout(500); + long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + try + { + client.getInputStream().read(); + Assert.fail(); + } + catch (SocketTimeoutException e) + { + long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start; + Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L)); + } - for (int i = 0; i < 10; ++i) - { - if (server.isOpen()) - Thread.sleep(10); - else - break; + // write then shutdown + client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8)); + + // Verify echo server to client + for (char c : "Goodbye Cruel TLS".toCharArray()) + { + int b = client.getInputStream().read(); + Assert.assertThat("expect valid char integer", b, greaterThan(0)); + assertEquals("expect characters to be same", c, (char)b); + } + client.close(); + + for (int i = 0; i < 10; ++i) + { + if (server.isOpen()) + Thread.sleep(10); + else + break; + } + assertFalse(server.isOpen()); + } } - assertFalse(server.isOpen()); } @Test @@ -204,265 +205,265 @@ public class SocketChannelEndPointTest { assumeTrue("Scenario supports half-close", _scenario.supportsHalfCloses()); - Socket client = _scenario.newClient(_connector); - - client.setSoTimeout(500); - - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - - _manager.accept(server); - - // Write client to server - client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); - - // Verify echo server to client - for (char c : "HelloWorld".toCharArray()) + try (Socket client = _scenario.newClient(_connector)) { - int b = client.getInputStream().read(); - assertTrue(b > 0); - assertEquals(c, (char) b); - } + client.setSoTimeout(500); + try (SocketChannel server = _connector.accept()) + { + server.configureBlocking(false); + _manager.accept(server); - // wait for read timeout - long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - try - { - client.getInputStream().read(); - Assert.fail(); - } - catch (SocketTimeoutException e) - { - assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start >= 400); - } + // Write client to server + client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); - // write then shutdown - client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8)); - client.shutdownOutput(); + // Verify echo server to client + for (char c : "HelloWorld".toCharArray()) + { + int b = client.getInputStream().read(); + assertTrue(b > 0); + assertEquals(c, (char)b); + } - // Verify echo server to client - for (char c : "Goodbye Cruel TLS".toCharArray()) - { - int b = client.getInputStream().read(); - assertTrue(b > 0); - assertEquals(c, (char) b); + // wait for read timeout + long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + try + { + client.getInputStream().read(); + Assert.fail(); + } + catch (SocketTimeoutException e) + { + assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start >= 400); + } + + // write then shutdown + client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8)); + client.shutdownOutput(); + + // Verify echo server to client + for (char c : "Goodbye Cruel TLS".toCharArray()) + { + int b = client.getInputStream().read(); + assertTrue(b > 0); + assertEquals(c, (char)b); + } + + // Read close + assertEquals(-1, client.getInputStream().read()); + } } - - // Read close - assertEquals(-1, client.getInputStream().read()); } @Test public void testReadBlocked() throws Exception { - Socket client = _scenario.newClient(_connector); - - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - - _manager.accept(server); - - OutputStream clientOutputStream = client.getOutputStream(); - InputStream clientInputStream = client.getInputStream(); - - int specifiedTimeout = 1000; - client.setSoTimeout(specifiedTimeout); - - // Write 8 and cause block waiting for 10 - _blockAt.set(10); - clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8)); - clientOutputStream.flush(); - - Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); - _lastEndPoint.setIdleTimeout(10 * specifiedTimeout); - Thread.sleep((11 * specifiedTimeout) / 10); - - long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - try + try (Socket client = _scenario.newClient(_connector); + SocketChannel server = _connector.accept()) { - int b = clientInputStream.read(); - Assert.fail("Should have timed out waiting for a response, but read " + b); - } - catch (SocketTimeoutException e) - { - int elapsed = Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start).intValue(); - Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3 * specifiedTimeout / 4)); - } + server.configureBlocking(false); + _manager.accept(server); - // write remaining characters - clientOutputStream.write("90ABCDEF".getBytes(StandardCharsets.UTF_8)); - clientOutputStream.flush(); + OutputStream clientOutputStream = client.getOutputStream(); + InputStream clientInputStream = client.getInputStream(); - // Verify echo server to client - for (char c : "1234567890ABCDEF".toCharArray()) - { - int b = clientInputStream.read(); - assertTrue(b > 0); - assertEquals(c, (char) b); + int specifiedTimeout = 1000; + client.setSoTimeout(specifiedTimeout); + + // Write 8 and cause block waiting for 10 + _blockAt.set(10); + clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8)); + clientOutputStream.flush(); + + Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); + _lastEndPoint.setIdleTimeout(10 * specifiedTimeout); + Thread.sleep((11 * specifiedTimeout) / 10); + + long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + try + { + int b = clientInputStream.read(); + Assert.fail("Should have timed out waiting for a response, but read " + b); + } + catch (SocketTimeoutException e) + { + int elapsed = Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - start).intValue(); + Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3 * specifiedTimeout / 4)); + } + + // write remaining characters + clientOutputStream.write("90ABCDEF".getBytes(StandardCharsets.UTF_8)); + clientOutputStream.flush(); + + // Verify echo server to client + for (char c : "1234567890ABCDEF".toCharArray()) + { + int b = clientInputStream.read(); + assertTrue(b > 0); + assertEquals(c, (char)b); + } } } @Test public void testStress() throws Exception { - Socket client = _scenario.newClient(_connector); - client.setSoTimeout(30000); - - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - - _manager.accept(server); - final int writes = 200000; - - final byte[] bytes = "HelloWorld-".getBytes(StandardCharsets.UTF_8); - byte[] count = "0\n".getBytes(StandardCharsets.UTF_8); - BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream()); - final CountDownLatch latch = new CountDownLatch(writes); - final InputStream in = new BufferedInputStream(client.getInputStream()); - final long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - out.write(bytes); - out.write(count); - out.flush(); - - Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); - _lastEndPoint.setIdleTimeout(5000); - - new Thread() + try (Socket client = _scenario.newClient(_connector)) { - @Override - public void run() + client.setSoTimeout(30000); + try (SocketChannel server = _connector.accept()) { - Thread.currentThread().setPriority(MAX_PRIORITY); - long last = -1; - int count = -1; - try - { - while (latch.getCount() > 0) - { - // Verify echo server to client - for (byte b0 : bytes) - { - int b = in.read(); - Assert.assertThat(b, greaterThan(0)); - assertEquals(0xff & b0, b); - } + server.configureBlocking(false); + _manager.accept(server); + final int writes = 200000; - count = 0; - int b = in.read(); - while (b > 0 && b != '\n') - { - count = count * 10 + (b - '0'); - b = in.read(); - } - last = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - - //if (latch.getCount()%1000==0) - // System.out.println(writes-latch.getCount()); - - latch.countDown(); - } - } - catch (Throwable e) - { - - long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - System.err.println("count=" + count); - System.err.println("latch=" + latch.getCount()); - System.err.println("time=" + (now - start)); - System.err.println("last=" + (now - last)); - System.err.println("endp=" + _lastEndPoint); - System.err.println("conn=" + _lastEndPoint.getConnection()); - - e.printStackTrace(); - } - } - }.start(); - - // Write client to server - for (int i = 1; i < writes; i++) - { - out.write(bytes); - out.write(Integer.toString(i).getBytes(StandardCharsets.ISO_8859_1)); - out.write('\n'); - if (i % 1000 == 0) - { - //System.err.println(i+"/"+writes); + final byte[] bytes = "HelloWorld-".getBytes(StandardCharsets.UTF_8); + byte[] count = "0\n".getBytes(StandardCharsets.UTF_8); + BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream()); + final CountDownLatch latch = new CountDownLatch(writes); + final InputStream in = new BufferedInputStream(client.getInputStream()); + final long start = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + out.write(bytes); + out.write(count); out.flush(); + + Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); + _lastEndPoint.setIdleTimeout(5000); + + new Thread(() -> + { + Thread.currentThread().setPriority(Thread.MAX_PRIORITY); + long last = -1; + int count1 = -1; + try + { + while (latch.getCount() > 0) + { + // Verify echo server to client + for (byte b0 : bytes) + { + int b = in.read(); + Assert.assertThat(b, greaterThan(0)); + assertEquals(0xff & b0, b); + } + + count1 = 0; + int b = in.read(); + while (b > 0 && b != '\n') + { + count1 = count1 * 10 + (b - '0'); + b = in.read(); + } + last = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + + //if (latch.getCount()%1000==0) + // System.out.println(writes-latch.getCount()); + + latch.countDown(); + } + } + catch (Throwable e) + { + + long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + System.err.println("count=" + count1); + System.err.println("latch=" + latch.getCount()); + System.err.println("time=" + (now - start)); + System.err.println("last=" + (now - last)); + System.err.println("endp=" + _lastEndPoint); + System.err.println("conn=" + _lastEndPoint.getConnection()); + + e.printStackTrace(); + } + }).start(); + + // Write client to server + for (int i = 1; i < writes; i++) + { + out.write(bytes); + out.write(Integer.toString(i).getBytes(StandardCharsets.ISO_8859_1)); + out.write('\n'); + if (i % 1000 == 0) + { + //System.err.println(i+"/"+writes); + out.flush(); + } + Thread.yield(); + } + out.flush(); + + long last = latch.getCount(); + while (!latch.await(5, TimeUnit.SECONDS)) + { + //System.err.println(latch.getCount()); + if (latch.getCount() == last) + Assert.fail(); + last = latch.getCount(); + } + + assertEquals(0, latch.getCount()); } - Thread.yield(); } - out.flush(); - - long last = latch.getCount(); - while (!latch.await(5, TimeUnit.SECONDS)) - { - //System.err.println(latch.getCount()); - if (latch.getCount() == last) - Assert.fail(); - last = latch.getCount(); - } - - assertEquals(0, latch.getCount()); } @Test public void testWriteBlocked() throws Exception { - Socket client = _scenario.newClient(_connector); - - client.setSoTimeout(10000); - - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - - _manager.accept(server); - - // Write client to server - _writeCount.set(10000); - String data = "Now is the time for all good men to come to the aid of the party"; - client.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8)); - BufferedInputStream in = new BufferedInputStream(client.getInputStream()); - - int byteNum = 0; - try + try (Socket client = _scenario.newClient(_connector)) { - for (int i = 0; i < _writeCount.get(); i++) + client.setSoTimeout(10000); + try (SocketChannel server = _connector.accept()) { - if (i % 1000 == 0) - TimeUnit.MILLISECONDS.sleep(200); + server.configureBlocking(false); + _manager.accept(server); - // Verify echo server to client - for (int j = 0; j < data.length(); j++) + // Write client to server + _writeCount.set(10000); + String data = "Now is the time for all good men to come to the aid of the party"; + client.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8)); + BufferedInputStream in = new BufferedInputStream(client.getInputStream()); + + int byteNum = 0; + try { - char c = data.charAt(j); - int b = in.read(); - byteNum++; - assertTrue(b > 0); - assertEquals("test-" + i + "/" + j, c, (char) b); + for (int i = 0; i < _writeCount.get(); i++) + { + if (i % 1000 == 0) + TimeUnit.MILLISECONDS.sleep(200); + + // Verify echo server to client + for (int j = 0; j < data.length(); j++) + { + char c = data.charAt(j); + int b = in.read(); + byteNum++; + assertTrue(b > 0); + assertEquals("test-" + i + "/" + j, c, (char)b); + } + + if (i == 0) + _lastEndPoint.setIdleTimeout(60000); + } + } + catch (SocketTimeoutException e) + { + System.err.println("SelectorManager.dump() = " + _manager.dump()); + LOG.warn("Server: " + server); + LOG.warn("Error reading byte #" + byteNum, e); + throw e; } - if (i == 0) - _lastEndPoint.setIdleTimeout(60000); + client.close(); + + for (int i = 0; i < 10; ++i) + { + if (server.isOpen()) + Thread.sleep(10); + else + break; + } + assertFalse(server.isOpen()); } } - catch (SocketTimeoutException e) - { - System.err.println("SelectorManager.dump() = " + _manager.dump()); - LOG.warn("Server: " + server); - LOG.warn("Error reading byte #" + byteNum, e); - throw e; - } - - client.close(); - - for (int i = 0; i < 10; ++i) - { - if (server.isOpen()) - Thread.sleep(10); - else - break; - } - assertFalse(server.isOpen()); } @@ -482,7 +483,7 @@ public class SocketChannelEndPointTest { @Override - protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) { SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler()); _lastEndPoint = endp; @@ -491,7 +492,7 @@ public class SocketChannelEndPointTest } @Override - public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) { return new TestConnection(endpoint, latch, getExecutor(), _blockAt, _writeCount); } @@ -507,18 +508,14 @@ public class SocketChannelEndPointTest CountDownLatch closed = new CountDownLatch(20); for (int i = 0; i < 20; i++) { - new Thread() + new Thread(() -> { - @Override - public void run() + try (Socket client = _scenario.newClient(_connector)) { - try (Socket client = _scenario.newClient(_connector);) + client.setSoTimeout(5000); + try (SocketChannel server = _connector.accept()) { - client.setSoTimeout(5000); - - SocketChannel server = _connector.accept(); server.configureBlocking(false); - _manager.accept(server); // Write client to server @@ -531,26 +528,26 @@ public class SocketChannelEndPointTest { int b = client.getInputStream().read(); assertTrue(b > 0); - assertEquals(c, (char) b); + assertEquals(c, (char)b); } assertEquals(-1, client.getInputStream().read()); echoed.incrementAndGet(); } - catch (SocketTimeoutException x) - { - x.printStackTrace(); - timeout.incrementAndGet(); - } - catch (Throwable x) - { - rejections.incrementAndGet(); - } - finally - { - closed.countDown(); - } } - }.start(); + catch (SocketTimeoutException x) + { + x.printStackTrace(); + timeout.incrementAndGet(); + } + catch (Throwable x) + { + rejections.incrementAndGet(); + } + finally + { + closed.countDown(); + } + }).start(); } // unblock the handling @@ -572,25 +569,25 @@ public class SocketChannelEndPointTest try (Socket client = _scenario.newClient(_connector)) { client.setSoTimeout(5000); - - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - - _manager.accept(server); - - // Write client to server - client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); - client.getOutputStream().flush(); - client.shutdownOutput(); - - // Verify echo server to client - for (char c : "HelloWorld".toCharArray()) + try (SocketChannel server = _connector.accept()) { - int b = client.getInputStream().read(); - assertTrue(b > 0); - assertEquals(c, (char) b); + server.configureBlocking(false); + _manager.accept(server); + + // Write client to server + client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); + client.getOutputStream().flush(); + client.shutdownOutput(); + + // Verify echo server to client + for (char c : "HelloWorld".toCharArray()) + { + int b = client.getInputStream().read(); + assertTrue(b > 0); + assertEquals(c, (char)b); + } + assertEquals(-1, client.getInputStream().read()); } - assertEquals(-1, client.getInputStream().read()); } } @@ -601,7 +598,7 @@ public class SocketChannelEndPointTest super(executor, scheduler); } - protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) { SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()); endp.setIdleTimeout(60000); @@ -611,7 +608,7 @@ public class SocketChannelEndPointTest } @Override - public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) { return _scenario.newConnection(channel, endpoint, getExecutor(), _blockAt, _writeCount); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java index 72d90192b34..23c60aeadc4 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java @@ -91,7 +91,7 @@ public class SslConnectionTest @Override - protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) { SocketChannelEndPoint endp = new TestEP(channel, selector, selectionKey, getScheduler()); endp.setIdleTimeout(60000); @@ -126,8 +126,7 @@ public class SslConnectionTest return false; } } - boolean flushed=super.flush(buffers); - return flushed; + return super.flush(buffers); } } @@ -186,15 +185,7 @@ public class SslConnectionTest fillInterested(); else { - getExecutor().execute(new Runnable() - { - - @Override - public void run() - { - getEndPoint().write(_writeCallback,BufferUtil.toBuffer("Hello Client")); - } - }); + getExecutor().execute(() -> getEndPoint().write(_writeCallback,BufferUtil.toBuffer("Hello Client"))); } } @@ -264,52 +255,54 @@ public class SslConnectionTest @Test public void testHelloWorld() throws Exception { - Socket client = newClient(); - client.setSoTimeout(60000); + try (Socket client = newClient()) + { + client.setSoTimeout(60000); + try (SocketChannel server = _connector.accept()) + { + server.configureBlocking(false); + _manager.accept(server); - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - _manager.accept(server); + client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); + byte[] buffer = new byte[1024]; + int len = client.getInputStream().read(buffer); + Assert.assertEquals(5, len); + Assert.assertEquals("Hello", new String(buffer, 0, len, StandardCharsets.UTF_8)); - client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); - byte[] buffer = new byte[1024]; - int len=client.getInputStream().read(buffer); - Assert.assertEquals(5, len); - Assert.assertEquals("Hello",new String(buffer,0,len,StandardCharsets.UTF_8)); - - _dispatches.set(0); - client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); - len=5; - while(len>0) - len-=client.getInputStream().read(buffer); - - client.close(); + _dispatches.set(0); + client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); + len = 5; + while (len > 0) + len -= client.getInputStream().read(buffer); + } + } } @Test public void testRenegotiate() throws Exception { - SSLSocket client = newClient(); - client.setSoTimeout(60000); + try (SSLSocket client = newClient()) + { + client.setSoTimeout(60000); + try (SocketChannel server = _connector.accept()) + { + server.configureBlocking(false); + _manager.accept(server); - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - _manager.accept(server); + client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); + byte[] buffer = new byte[1024]; + int len = client.getInputStream().read(buffer); + Assert.assertEquals(5, len); + Assert.assertEquals("Hello", new String(buffer, 0, len, StandardCharsets.UTF_8)); - client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); - byte[] buffer = new byte[1024]; - int len=client.getInputStream().read(buffer); - Assert.assertEquals(5, len); - Assert.assertEquals("Hello",new String(buffer,0,len,StandardCharsets.UTF_8)); + client.startHandshake(); - client.startHandshake(); - - client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); - len=client.getInputStream().read(buffer); - Assert.assertEquals(5, len); - Assert.assertEquals("World",new String(buffer,0,len,StandardCharsets.UTF_8)); - - client.close(); + client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); + len = client.getInputStream().read(buffer); + Assert.assertEquals(5, len); + Assert.assertEquals("World", new String(buffer, 0, len, StandardCharsets.UTF_8)); + } + } } @Test @@ -317,30 +310,33 @@ public class SslConnectionTest { __sslCtxFactory.setRenegotiationAllowed(false); - SSLSocket client = newClient(); - client.setSoTimeout(60000); - - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - _manager.accept(server); - - client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); - byte[] buffer = new byte[1024]; - int len=client.getInputStream().read(buffer); - Assert.assertEquals(5, len); - Assert.assertEquals("Hello",new String(buffer,0,len,StandardCharsets.UTF_8)); - - client.startHandshake(); - - client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); - try + try (SSLSocket client = newClient()) { - client.getInputStream().read(buffer); - Assert.fail(); - } - catch(SSLException e) - { - // expected + client.setSoTimeout(60000); + try (SocketChannel server = _connector.accept()) + { + server.configureBlocking(false); + _manager.accept(server); + + client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); + byte[] buffer = new byte[1024]; + int len = client.getInputStream().read(buffer); + Assert.assertEquals(5, len); + Assert.assertEquals("Hello", new String(buffer, 0, len, StandardCharsets.UTF_8)); + + client.startHandshake(); + + client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); + try + { + client.getInputStream().read(buffer); + Assert.fail(); + } + catch (SSLException e) + { + // expected + } + } } } @@ -350,152 +346,149 @@ public class SslConnectionTest __sslCtxFactory.setRenegotiationAllowed(true); __sslCtxFactory.setRenegotiationLimit(2); - SSLSocket client = newClient(); - client.setSoTimeout(60000); - - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - _manager.accept(server); - - client.getOutputStream().write("Good".getBytes(StandardCharsets.UTF_8)); - byte[] buffer = new byte[1024]; - int len=client.getInputStream().read(buffer); - Assert.assertEquals(4, len); - Assert.assertEquals("Good",new String(buffer,0,len,StandardCharsets.UTF_8)); - - client.startHandshake(); - - client.getOutputStream().write("Bye".getBytes(StandardCharsets.UTF_8)); - len=client.getInputStream().read(buffer); - Assert.assertEquals(3, len); - Assert.assertEquals("Bye",new String(buffer,0,len,StandardCharsets.UTF_8)); - - client.startHandshake(); - - client.getOutputStream().write("Cruel".getBytes(StandardCharsets.UTF_8)); - len=client.getInputStream().read(buffer); - Assert.assertEquals(5, len); - Assert.assertEquals("Cruel",new String(buffer,0,len,StandardCharsets.UTF_8)); - - client.startHandshake(); - - client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); - try + try (SSLSocket client = newClient()) { - client.getInputStream().read(buffer); - Assert.fail(); - } - catch(SSLException e) - { - // expected + client.setSoTimeout(60000); + try (SocketChannel server = _connector.accept()) + { + server.configureBlocking(false); + _manager.accept(server); + + client.getOutputStream().write("Good".getBytes(StandardCharsets.UTF_8)); + byte[] buffer = new byte[1024]; + int len = client.getInputStream().read(buffer); + Assert.assertEquals(4, len); + Assert.assertEquals("Good", new String(buffer, 0, len, StandardCharsets.UTF_8)); + + client.startHandshake(); + + client.getOutputStream().write("Bye".getBytes(StandardCharsets.UTF_8)); + len = client.getInputStream().read(buffer); + Assert.assertEquals(3, len); + Assert.assertEquals("Bye", new String(buffer, 0, len, StandardCharsets.UTF_8)); + + client.startHandshake(); + + client.getOutputStream().write("Cruel".getBytes(StandardCharsets.UTF_8)); + len = client.getInputStream().read(buffer); + Assert.assertEquals(5, len); + Assert.assertEquals("Cruel", new String(buffer, 0, len, StandardCharsets.UTF_8)); + + client.startHandshake(); + + client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); + try + { + client.getInputStream().read(buffer); + Assert.fail(); + } + catch (SSLException e) + { + // expected + } + } } } - - @Test public void testWriteOnConnect() throws Exception { _testFill=false; - _writeCallback = new FutureCallback(); - Socket client = newClient(); - client.setSoTimeout(10000); - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - _manager.accept(server); + try (Socket client = newClient()) + { + client.setSoTimeout(10000); + try (SocketChannel server = _connector.accept()) + { + server.configureBlocking(false); + _manager.accept(server); - byte[] buffer = new byte[1024]; - int len=client.getInputStream().read(buffer); - Assert.assertEquals("Hello Client",new String(buffer,0,len,StandardCharsets.UTF_8)); - Assert.assertEquals(null,_writeCallback.get(100,TimeUnit.MILLISECONDS)); - client.close(); + byte[] buffer = new byte[1024]; + int len = client.getInputStream().read(buffer); + Assert.assertEquals("Hello Client", new String(buffer, 0, len, StandardCharsets.UTF_8)); + Assert.assertNull(_writeCallback.get(100, TimeUnit.MILLISECONDS)); + } + } } - - @Test public void testBlockedWrite() throws Exception { - Socket client = newClient(); - client.setSoTimeout(5000); + try (Socket client = newClient()) + { + client.setSoTimeout(5000); + try (SocketChannel server = _connector.accept()) + { + server.configureBlocking(false); + _manager.accept(server); - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - _manager.accept(server); + __startBlocking.set(5); + __blockFor.set(3); - __startBlocking.set(5); - __blockFor.set(3); - - client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); - byte[] buffer = new byte[1024]; - int len=client.getInputStream().read(buffer); - Assert.assertEquals(5, len); - Assert.assertEquals("Hello",new String(buffer,0,len,StandardCharsets.UTF_8)); + client.getOutputStream().write("Hello".getBytes(StandardCharsets.UTF_8)); + byte[] buffer = new byte[1024]; + int len = client.getInputStream().read(buffer); + Assert.assertEquals(5, len); + Assert.assertEquals("Hello", new String(buffer, 0, len, StandardCharsets.UTF_8)); - _dispatches.set(0); - client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); - len=5; - while(len>0) - len-=client.getInputStream().read(buffer); - Assert.assertEquals(0, len); - client.close(); + _dispatches.set(0); + client.getOutputStream().write("World".getBytes(StandardCharsets.UTF_8)); + len = 5; + while (len > 0) + len -= client.getInputStream().read(buffer); + Assert.assertEquals(0, len); + } + } } @Test public void testManyLines() throws Exception { - final Socket client = newClient(); - client.setSoTimeout(10000); - - SocketChannel server = _connector.accept(); - server.configureBlocking(false); - _manager.accept(server); - - final int LINES=20; - final CountDownLatch count=new CountDownLatch(LINES); - - - new Thread() + try (Socket client = newClient()) { - @Override - public void run() + client.setSoTimeout(10000); + try (SocketChannel server = _connector.accept()) { - try + server.configureBlocking(false); + _manager.accept(server); + + final int LINES = 20; + final CountDownLatch count = new CountDownLatch(LINES); + + new Thread(() -> { - BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream(),StandardCharsets.UTF_8)); - while(count.getCount()>0) + try { - String line=in.readLine(); - if (line==null) - break; - // System.err.println(line); - count.countDown(); + BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream(), StandardCharsets.UTF_8)); + while (count.getCount() > 0) + { + String line = in.readLine(); + if (line == null) + break; + // System.err.println(line); + count.countDown(); + } + } + catch (IOException e) + { + e.printStackTrace(); + } + }).start(); + + for (int i = 0; i < LINES; i++) + { + client.getOutputStream().write(("HelloWorld " + i + "\n").getBytes(StandardCharsets.UTF_8)); + // System.err.println("wrote"); + if (i % 1000 == 0) + { + client.getOutputStream().flush(); + Thread.sleep(10); } } - catch(IOException e) - { - e.printStackTrace(); - } - } - }.start(); - for (int i=0;i