diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java new file mode 100644 index 00000000000..cc93df28d29 --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java @@ -0,0 +1,526 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.client.util.FormContentProvider; +import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpHeaderValue; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.NetworkTrafficListener; +import org.eclipse.jetty.io.NetworkTrafficSocketChannelEndPoint; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.NetworkTrafficServerConnector; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Fields; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class NetworkTrafficListenerTest +{ + private static final String END_OF_CONTENT = "~"; + + private Server server; + private NetworkTrafficServerConnector connector; + private NetworkTrafficHttpClient client; + + private void start(Handler handler) throws Exception + { + startServer(handler); + startClient(); + } + + private void startServer(Handler handler) throws Exception + { + server = new Server(); + connector = new NetworkTrafficServerConnector(server); + connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendDateHeader(false); + connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendServerVersion(false); + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } + + private void startClient() throws Exception + { + client = new NetworkTrafficHttpClient(new ArrayList<>()); + client.start(); + } + + @AfterEach + public void dispose() throws Exception + { + if (client != null) + client.stop(); + if (server != null) + server.stop(); + } + + @Test + public void testOpenedClosedAreInvoked() throws Exception + { + startServer(null); + + CountDownLatch openedLatch = new CountDownLatch(1); + CountDownLatch closedLatch = new CountDownLatch(1); + connector.addNetworkTrafficListener(new NetworkTrafficListener() + { + public volatile Socket socket; + + @Override + public void opened(Socket socket) + { + this.socket = socket; + openedLatch.countDown(); + } + + @Override + public void closed(Socket socket) + { + if (this.socket == socket) + closedLatch.countDown(); + } + }); + int port = connector.getLocalPort(); + + // Connect to the server + try (Socket ignored = new Socket("localhost", port)) + { + assertTrue(openedLatch.await(10, TimeUnit.SECONDS)); + } + assertTrue(closedLatch.await(10, TimeUnit.SECONDS)); + } + + @Test + public void testTrafficWithNoResponseContentOnNonPersistentConnection() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) + { + request.setHandled(true); + } + }); + + AtomicReference serverIncoming = new AtomicReference<>(""); + CountDownLatch serverIncomingLatch = new CountDownLatch(1); + AtomicReference serverOutgoing = new AtomicReference<>(""); + CountDownLatch serverOutgoingLatch = new CountDownLatch(1); + connector.addNetworkTrafficListener(new NetworkTrafficListener() + { + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + serverIncomingLatch.countDown(); + } + + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + serverOutgoingLatch.countDown(); + } + }); + + AtomicReference clientIncoming = new AtomicReference<>(""); + CountDownLatch clientIncomingLatch = new CountDownLatch(1); + AtomicReference clientOutgoing = new AtomicReference<>(""); + CountDownLatch clientOutgoingLatch = new CountDownLatch(1); + client.listeners.add(new NetworkTrafficListener() + { + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + clientOutgoingLatch.countDown(); + } + + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + clientIncomingLatch.countDown(); + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()) + .send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS)); + assertEquals(clientOutgoing.get(), serverIncoming.get()); + assertEquals(serverOutgoing.get(), clientIncoming.get()); + } + + @Test + public void testTrafficWithResponseContentOnPersistentConnection() throws Exception + { + String responseContent = "response_content" + END_OF_CONTENT; + start(new AbstractHandler() + { + @Override + public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException + { + request.setHandled(true); + ServletOutputStream output = servletResponse.getOutputStream(); + output.write(responseContent.getBytes(StandardCharsets.UTF_8)); + } + }); + + AtomicReference serverIncoming = new AtomicReference<>(""); + CountDownLatch serverIncomingLatch = new CountDownLatch(1); + AtomicReference serverOutgoing = new AtomicReference<>(""); + CountDownLatch serverOutgoingLatch = new CountDownLatch(1); + connector.addNetworkTrafficListener(new NetworkTrafficListener() + { + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + serverIncomingLatch.countDown(); + } + + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + serverOutgoingLatch.countDown(); + } + }); + + AtomicReference clientIncoming = new AtomicReference<>(""); + CountDownLatch clientIncomingLatch = new CountDownLatch(1); + AtomicReference clientOutgoing = new AtomicReference<>(""); + CountDownLatch clientOutgoingLatch = new CountDownLatch(1); + client.listeners.add(new NetworkTrafficListener() + { + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + clientOutgoingLatch.countDown(); + } + + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + clientIncomingLatch.countDown(); + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()).send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + assertEquals(responseContent, response.getContentAsString()); + + assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS)); + assertEquals(clientOutgoing.get(), serverIncoming.get()); + assertEquals(serverOutgoing.get(), clientIncoming.get()); + } + + @Test + public void testTrafficWithResponseContentChunkedOnPersistentConnection() throws Exception + { + String responseContent = "response_content"; + String responseChunk1 = responseContent.substring(0, responseContent.length() / 2); + String responseChunk2 = responseContent.substring(responseContent.length() / 2); + start(new AbstractHandler() + { + @Override + public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException + { + request.setHandled(true); + ServletOutputStream output = servletResponse.getOutputStream(); + output.write(responseChunk1.getBytes(StandardCharsets.UTF_8)); + output.flush(); + output.write(responseChunk2.getBytes(StandardCharsets.UTF_8)); + output.flush(); + } + }); + + AtomicReference serverIncoming = new AtomicReference<>(""); + CountDownLatch serverIncomingLatch = new CountDownLatch(1); + AtomicReference serverOutgoing = new AtomicReference<>(""); + CountDownLatch serverOutgoingLatch = new CountDownLatch(1); + connector.addNetworkTrafficListener(new NetworkTrafficListener() + { + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + serverIncomingLatch.countDown(); + } + + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + if (serverOutgoing.get().endsWith("\r\n0\r\n\r\n")) + serverOutgoingLatch.countDown(); + } + }); + + AtomicReference clientIncoming = new AtomicReference<>(""); + CountDownLatch clientIncomingLatch = new CountDownLatch(1); + AtomicReference clientOutgoing = new AtomicReference<>(""); + CountDownLatch clientOutgoingLatch = new CountDownLatch(1); + client.listeners.add(new NetworkTrafficListener() + { + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + clientOutgoingLatch.countDown(); + } + + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + if (clientIncoming.get().endsWith("\r\n0\r\n\r\n")) + clientIncomingLatch.countDown(); + } + }); + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()).send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS)); + assertEquals(clientOutgoing.get(), serverIncoming.get()); + assertEquals(serverOutgoing.get(), clientIncoming.get()); + } + + @Test + public void testTrafficWithRequestContentWithResponseRedirectOnPersistentConnection() throws Exception + { + String location = "/redirect"; + start(new AbstractHandler() + { + @Override + public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException + { + request.setHandled(true); + servletResponse.sendRedirect(location); + } + }); + + AtomicReference serverIncoming = new AtomicReference<>(""); + CountDownLatch serverIncomingLatch = new CountDownLatch(1); + AtomicReference serverOutgoing = new AtomicReference<>(""); + CountDownLatch serverOutgoingLatch = new CountDownLatch(1); + connector.addNetworkTrafficListener(new NetworkTrafficListener() + { + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + serverIncomingLatch.countDown(); + } + + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + serverOutgoingLatch.countDown(); + } + }); + + AtomicReference clientIncoming = new AtomicReference<>(""); + CountDownLatch clientIncomingLatch = new CountDownLatch(1); + AtomicReference clientOutgoing = new AtomicReference<>(""); + CountDownLatch clientOutgoingLatch = new CountDownLatch(1); + client.listeners.add(new NetworkTrafficListener() + { + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + clientOutgoingLatch.countDown(); + } + + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + clientIncomingLatch.countDown(); + } + }); + + client.setFollowRedirects(false); + Fields fields = new Fields(); + fields.put("a", "1"); + fields.put("b", "2"); + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .content(new FormContentProvider(fields)) + .send(); + assertEquals(HttpStatus.FOUND_302, response.getStatus()); + + assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS)); + assertEquals(clientOutgoing.get(), serverIncoming.get()); + assertEquals(serverOutgoing.get(), clientIncoming.get()); + } + + @Test + public void testTrafficWithBigRequestContentOnPersistentConnection() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException + { + // Read and discard the request body to make the test more + // reliable, otherwise there is a race between request body + // upload and response download + InputStream input = servletRequest.getInputStream(); + byte[] buffer = new byte[4096]; + while (true) + { + int read = input.read(buffer); + if (read < 0) + break; + } + request.setHandled(true); + } + }); + + AtomicReference serverIncoming = new AtomicReference<>(""); + AtomicReference serverOutgoing = new AtomicReference<>(""); + CountDownLatch serverOutgoingLatch = new CountDownLatch(1); + connector.addNetworkTrafficListener(new NetworkTrafficListener() + { + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + } + + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + serverOutgoingLatch.countDown(); + } + }); + + AtomicReference clientIncoming = new AtomicReference<>(""); + CountDownLatch clientIncomingLatch = new CountDownLatch(1); + AtomicReference clientOutgoing = new AtomicReference<>(""); + client.listeners.add(new NetworkTrafficListener() + { + @Override + public void outgoing(Socket socket, ByteBuffer bytes) + { + clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + } + + @Override + public void incoming(Socket socket, ByteBuffer bytes) + { + clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); + clientIncomingLatch.countDown(); + } + }); + + // Generate a large request content. + String requestContent = "0123456789ABCDEF"; + for (int i = 0; i < 16; ++i) + { + requestContent += requestContent; + } + + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .content(new StringContentProvider(requestContent)) + .send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS)); + assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS)); + assertEquals(clientOutgoing.get(), serverIncoming.get()); + assertTrue(clientOutgoing.get().length() > requestContent.length()); + assertEquals(serverOutgoing.get(), clientIncoming.get()); + } + + private static class NetworkTrafficHttpClient extends HttpClient + { + private final List listeners; + + private NetworkTrafficHttpClient(List listeners) + { + super(new HttpClientTransportOverHTTP() + { + @Override + protected SelectorManager newSelectorManager(HttpClient client) + { + return new AbstractConnectorHttpClientTransport.ClientSelectorManager(client, getSelectors()) + { + @Override + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) + { + return new NetworkTrafficSocketChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout(), listeners); + } + }; + } + }, null); + this.listeners = listeners; + } + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficListener.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficListener.java index 92075bda28f..f2897ba2398 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficListener.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficListener.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; /** *

A listener for raw network traffic within Jetty.

*

{@link NetworkTrafficListener}s can be installed in a - * org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector, + * {@code org.eclipse.jetty.server.NetworkTrafficServerConnector}, * and are notified of the following network traffic events:

*
    *
  • Connection opened, when the server has accepted the connection from a remote client
  • @@ -45,7 +45,9 @@ public interface NetworkTrafficListener * * @param socket the socket associated with the remote client */ - void opened(Socket socket); + default void opened(Socket socket) + { + } /** *

    Callback method invoked when bytes sent by a remote client arrived on the server.

    @@ -53,7 +55,9 @@ public interface NetworkTrafficListener * @param socket the socket associated with the remote client * @param bytes the read-only buffer containing the incoming bytes */ - void incoming(Socket socket, ByteBuffer bytes); + default void incoming(Socket socket, ByteBuffer bytes) + { + } /** *

    Callback method invoked when bytes are sent to a remote client from the server.

    @@ -62,7 +66,9 @@ public interface NetworkTrafficListener * @param socket the socket associated with the remote client * @param bytes the read-only buffer containing the outgoing bytes */ - void outgoing(Socket socket, ByteBuffer bytes); + default void outgoing(Socket socket, ByteBuffer bytes) + { + } /** *

    Callback method invoked when a connection to a remote client has been closed.

    @@ -74,11 +80,15 @@ public interface NetworkTrafficListener * * @param socket the (closed) socket associated with the remote client */ - void closed(Socket socket); + default void closed(Socket socket) + { + } /** *

    A commodity class that implements {@link NetworkTrafficListener} with empty methods.

    + * @deprecated use {@link NetworkTrafficListener} instead */ + @Deprecated class Adapter implements NetworkTrafficListener { @Override diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java index c403df4aa68..375bc6477d4 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java @@ -18,133 +18,20 @@ package org.eclipse.jetty.io; -import java.io.IOException; -import java.net.Socket; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.List; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; -public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint +/** + * @deprecated use {@link NetworkTrafficSocketChannelEndPoint} instead + */ +@Deprecated +public class NetworkTrafficSelectChannelEndPoint extends NetworkTrafficSocketChannelEndPoint { - private static final Logger LOG = Log.getLogger(NetworkTrafficSelectChannelEndPoint.class); - - private final List listeners; - - public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, List listeners) throws IOException + public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, List listeners) { - super(channel, selectSet, key, scheduler, idleTimeout); - this.listeners = listeners; - } - - @Override - public int fill(ByteBuffer buffer) throws IOException - { - int read = super.fill(buffer); - notifyIncoming(buffer, read); - return read; - } - - @Override - public boolean flush(ByteBuffer... buffers) throws IOException - { - boolean flushed = true; - for (ByteBuffer b : buffers) - { - if (b.hasRemaining()) - { - int position = b.position(); - ByteBuffer view = b.slice(); - flushed &= super.flush(b); - int l = b.position() - position; - view.limit(view.position() + l); - notifyOutgoing(view); - if (!flushed) - break; - } - } - return flushed; - } - - @Override - public void onOpen() - { - super.onOpen(); - if (listeners != null && !listeners.isEmpty()) - { - for (NetworkTrafficListener listener : listeners) - { - try - { - listener.opened(getSocket()); - } - catch (Exception x) - { - LOG.warn(x); - } - } - } - } - - @Override - public void onClose() - { - super.onClose(); - if (listeners != null && !listeners.isEmpty()) - { - for (NetworkTrafficListener listener : listeners) - { - try - { - listener.closed(getSocket()); - } - catch (Exception x) - { - LOG.warn(x); - } - } - } - } - - public void notifyIncoming(ByteBuffer buffer, int read) - { - if (listeners != null && !listeners.isEmpty() && read > 0) - { - for (NetworkTrafficListener listener : listeners) - { - try - { - ByteBuffer view = buffer.asReadOnlyBuffer(); - listener.incoming(getSocket(), view); - } - catch (Exception x) - { - LOG.warn(x); - } - } - } - } - - public void notifyOutgoing(ByteBuffer view) - { - if (listeners != null && !listeners.isEmpty() && view.hasRemaining()) - { - Socket socket = getSocket(); - for (NetworkTrafficListener listener : listeners) - { - try - { - listener.outgoing(socket, view); - } - catch (Exception x) - { - LOG.warn(x); - } - } - } + super(channel, selectSet, key, scheduler, idleTimeout, listeners); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSocketChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSocketChannelEndPoint.java new file mode 100644 index 00000000000..f8d74e0acfc --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSocketChannelEndPoint.java @@ -0,0 +1,136 @@ +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.util.List; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; + +/** + *

    A specialized version of {@link SocketChannelEndPoint} that supports {@link NetworkTrafficListener}s.

    + */ +public class NetworkTrafficSocketChannelEndPoint extends SocketChannelEndPoint +{ + private static final Logger LOG = Log.getLogger(NetworkTrafficSocketChannelEndPoint.class); + + private final List listeners; + + public NetworkTrafficSocketChannelEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, List listeners) + { + super(channel, selectSet, key, scheduler); + setIdleTimeout(idleTimeout); + this.listeners = listeners; + } + + @Override + public int fill(ByteBuffer buffer) throws IOException + { + int read = super.fill(buffer); + notifyIncoming(buffer, read); + return read; + } + + @Override + public boolean flush(ByteBuffer... buffers) throws IOException + { + boolean flushed = true; + for (ByteBuffer b : buffers) + { + if (b.hasRemaining()) + { + int position = b.position(); + ByteBuffer view = b.slice(); + flushed = super.flush(b); + int l = b.position() - position; + view.limit(view.position() + l); + notifyOutgoing(view); + if (!flushed) + break; + } + } + return flushed; + } + + @Override + public void onOpen() + { + super.onOpen(); + if (listeners != null && !listeners.isEmpty()) + { + for (NetworkTrafficListener listener : listeners) + { + try + { + listener.opened(getSocket()); + } + catch (Exception x) + { + LOG.warn(x); + } + } + } + } + + @Override + public void onClose() + { + super.onClose(); + if (listeners != null && !listeners.isEmpty()) + { + for (NetworkTrafficListener listener : listeners) + { + try + { + listener.closed(getSocket()); + } + catch (Exception x) + { + LOG.warn(x); + } + } + } + } + + public void notifyIncoming(ByteBuffer buffer, int read) + { + if (listeners != null && !listeners.isEmpty() && read > 0) + { + for (NetworkTrafficListener listener : listeners) + { + try + { + ByteBuffer view = buffer.asReadOnlyBuffer(); + listener.incoming(getSocket(), view); + } + catch (Exception x) + { + LOG.warn(x); + } + } + } + } + + public void notifyOutgoing(ByteBuffer view) + { + if (listeners != null && !listeners.isEmpty() && view.hasRemaining()) + { + Socket socket = getSocket(); + for (NetworkTrafficListener listener : listeners) + { + try + { + listener.outgoing(socket, view); + } + catch (Exception x) + { + LOG.warn(x); + } + } + } + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java index dab17e501b0..4c635be66ed 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.server; -import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.List; @@ -29,7 +28,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.NetworkTrafficListener; -import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint; +import org.eclipse.jetty.io.NetworkTrafficSocketChannelEndPoint; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.Scheduler; @@ -84,9 +83,8 @@ public class NetworkTrafficServerConnector extends ServerConnector } @Override - protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) { - NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners); - return endPoint; + return new NetworkTrafficSocketChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners); } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java deleted file mode 100644 index a64b7569466..00000000000 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java +++ /dev/null @@ -1,484 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.server; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import javax.servlet.ServletException; -import javax.servlet.ServletOutputStream; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.eclipse.jetty.io.NetworkTrafficListener; -import org.eclipse.jetty.server.handler.AbstractHandler; -import org.eclipse.jetty.util.BufferUtil; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class NetworkTrafficListenerTest -{ - private static final byte END_OF_CONTENT = '~'; - - private Server server; - private NetworkTrafficServerConnector connector; - - public void initConnector(Handler handler) throws Exception - { - server = new Server(); - - connector = new NetworkTrafficServerConnector(server); - connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendDateHeader(false); - connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendServerVersion(false); - server.addConnector(connector); - server.setHandler(handler); - server.start(); - } - - @AfterEach - public void destroyConnector() throws Exception - { - if (server != null) - { - server.stop(); - server.join(); - } - } - - @Test - public void testOpenedClosedAreInvoked() throws Exception - { - initConnector(null); - - final CountDownLatch openedLatch = new CountDownLatch(1); - final CountDownLatch closedLatch = new CountDownLatch(1); - connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() - { - public volatile Socket socket; - - @Override - public void opened(Socket socket) - { - this.socket = socket; - openedLatch.countDown(); - } - - @Override - public void closed(Socket socket) - { - if (this.socket == socket) - closedLatch.countDown(); - } - }); - int port = connector.getLocalPort(); - - // Connect to the server - Socket socket = new Socket("localhost", port); - assertTrue(openedLatch.await(10, TimeUnit.SECONDS)); - - socket.close(); - assertTrue(closedLatch.await(10, TimeUnit.SECONDS)); - } - - @Test - public void testTrafficWithNoResponseContentOnNonPersistentConnection() throws Exception - { - initConnector(new AbstractHandler() - { - @Override - public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException - { - request.setHandled(true); - } - }); - - final AtomicReference incomingData = new AtomicReference<>(); - final CountDownLatch incomingLatch = new CountDownLatch(1); - final AtomicReference outgoingData = new AtomicReference<>(""); - final CountDownLatch outgoingLatch = new CountDownLatch(1); - connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() - { - @Override - public void incoming(Socket socket, ByteBuffer bytes) - { - incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - incomingLatch.countDown(); - } - - @Override - public void outgoing(Socket socket, ByteBuffer bytes) - { - outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - outgoingLatch.countDown(); - } - }); - int port = connector.getLocalPort(); - - String request = - "GET / HTTP/1.1\r\n" + - "Host: localhost:" + port + "\r\n" + - "Connection: close\r\n" + - "\r\n"; - String expectedResponse = - "HTTP/1.1 200 OK\r\n" + - "Connection: close\r\n" + - "\r\n"; - - Socket socket = new Socket("localhost", port); - OutputStream output = socket.getOutputStream(); - output.write(request.getBytes(StandardCharsets.UTF_8)); - output.flush(); - - assertTrue(incomingLatch.await(1, TimeUnit.SECONDS)); - assertEquals(request, incomingData.get()); - - assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS)); - assertEquals(expectedResponse, outgoingData.get()); - - byte[] responseBytes = readResponse(socket); - String response = new String(responseBytes, StandardCharsets.UTF_8); - assertEquals(expectedResponse, response); - - socket.close(); - } - - @Test - public void testTrafficWithResponseContentOnPersistentConnection() throws Exception - { - final String responseContent = "response_content"; - initConnector(new AbstractHandler() - { - @Override - public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException - { - request.setHandled(true); - ServletOutputStream output = servletResponse.getOutputStream(); - output.write(responseContent.getBytes(StandardCharsets.UTF_8)); - output.write(END_OF_CONTENT); - } - }); - - final AtomicReference incomingData = new AtomicReference<>(); - final CountDownLatch incomingLatch = new CountDownLatch(1); - final AtomicReference outgoingData = new AtomicReference<>(""); - final CountDownLatch outgoingLatch = new CountDownLatch(2); - connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() - { - @Override - public void incoming(Socket socket, ByteBuffer bytes) - { - incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - incomingLatch.countDown(); - } - - @Override - public void outgoing(Socket socket, ByteBuffer bytes) - { - outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - outgoingLatch.countDown(); - } - }); - int port = connector.getLocalPort(); - - String request = - "GET / HTTP/1.1\r\n" + - "Host: localhost:" + port + "\r\n" + - "\r\n"; - String expectedResponse = - "HTTP/1.1 200 OK\r\n" + - "Content-Length: " + (responseContent.length() + 1) + "\r\n" + - "\r\n" + - "" + responseContent + (char)END_OF_CONTENT; - - Socket socket = new Socket("localhost", port); - OutputStream output = socket.getOutputStream(); - output.write(request.getBytes(StandardCharsets.UTF_8)); - output.flush(); - - assertTrue(incomingLatch.await(1, TimeUnit.SECONDS)); - assertEquals(request, incomingData.get()); - - assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS)); - assertEquals(expectedResponse, outgoingData.get()); - - byte[] responseBytes = readResponse(socket); - String response = new String(responseBytes, StandardCharsets.UTF_8); - assertEquals(expectedResponse, response); - - socket.close(); - } - - @Test - public void testTrafficWithResponseContentChunkedOnPersistentConnection() throws Exception - { - final String responseContent = "response_content"; - final String responseChunk1 = "response_content".substring(0, responseContent.length() / 2); - final String responseChunk2 = "response_content".substring(responseContent.length() / 2, responseContent.length()); - initConnector(new AbstractHandler() - { - @Override - public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException - { - request.setHandled(true); - ServletOutputStream output = servletResponse.getOutputStream(); - output.write(responseChunk1.getBytes(StandardCharsets.UTF_8)); - output.flush(); - output.write(responseChunk2.getBytes(StandardCharsets.UTF_8)); - output.flush(); - } - }); - - final AtomicReference incomingData = new AtomicReference<>(); - final CountDownLatch incomingLatch = new CountDownLatch(1); - final AtomicReference outgoingData = new AtomicReference<>(""); - final CountDownLatch outgoingLatch = new CountDownLatch(1); - connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() - { - @Override - public void incoming(Socket socket, ByteBuffer bytes) - { - incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - incomingLatch.countDown(); - } - - @Override - public void outgoing(Socket socket, ByteBuffer bytes) - { - outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - if (outgoingData.get().endsWith("\r\n0\r\n\r\n")) - outgoingLatch.countDown(); - } - }); - int port = connector.getLocalPort(); - - String request = - "GET / HTTP/1.1\r\n" + - "Host: localhost:" + port + "\r\n" + - "\r\n"; - String expectedResponse = - "HTTP/1.1 200 OK\r\n" + - "Transfer-Encoding: chunked\r\n" + - "\r\n" + - responseChunk1.length() + "\r\n" + - responseChunk1 + "\r\n" + - responseChunk2.length() + "\r\n" + - responseChunk2 + "\r\n" + - "0\r\n" + - "\r\n"; - - Socket socket = new Socket("localhost", port); - OutputStream output = socket.getOutputStream(); - output.write(request.getBytes(StandardCharsets.UTF_8)); - output.flush(); - - assertTrue(incomingLatch.await(1, TimeUnit.SECONDS)); - assertEquals(request, incomingData.get()); - - assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS)); - assertEquals(expectedResponse, outgoingData.get()); - - byte[] responseBytes = readResponse(socket); - String response = new String(responseBytes, StandardCharsets.UTF_8); - assertEquals(expectedResponse, response); - - socket.close(); - } - - @Test - public void testTrafficWithRequestContentWithResponseRedirectOnPersistentConnection() throws Exception - { - final String location = "/redirect"; - initConnector(new AbstractHandler() - { - @Override - public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException - { - request.setHandled(true); - servletResponse.sendRedirect(location); - } - }); - - final AtomicReference incomingData = new AtomicReference<>(); - final CountDownLatch incomingLatch = new CountDownLatch(1); - final AtomicReference outgoingData = new AtomicReference<>(""); - final CountDownLatch outgoingLatch = new CountDownLatch(1); - connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() - { - @Override - public void incoming(Socket socket, ByteBuffer bytes) - { - incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - incomingLatch.countDown(); - } - - @Override - public void outgoing(Socket socket, ByteBuffer bytes) - { - outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - outgoingLatch.countDown(); - } - }); - int port = connector.getLocalPort(); - - String requestContent = "a=1&b=2"; - String request = - "POST / HTTP/1.1\r\n" + - "Host: localhost:" + port + "\r\n" + - "Content-Type: application/x-www-form-urlencoded\r\n" + - "Content-Length: " + requestContent.length() + "\r\n" + - "\r\n" + - requestContent; - String expectedResponse = - "HTTP/1.1 302 Found\r\n" + - "Location: http://localhost:" + port + location + "\r\n" + - "Content-Length: 0\r\n" + - "\r\n"; - - Socket socket = new Socket("localhost", port); - OutputStream output = socket.getOutputStream(); - output.write(request.getBytes(StandardCharsets.UTF_8)); - output.flush(); - - assertTrue(incomingLatch.await(1, TimeUnit.SECONDS)); - assertEquals(request, incomingData.get()); - - assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS)); - assertEquals(expectedResponse, outgoingData.get()); - - byte[] responseBytes = readResponse(socket); - String response = new String(responseBytes, StandardCharsets.UTF_8); - assertEquals(expectedResponse, response); - - socket.close(); - } - - @Test - public void testTrafficWithBigRequestContentOnPersistentConnection() throws Exception - { - initConnector(new AbstractHandler() - { - @Override - public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException - { - // Read and discard the request body to make the test more - // reliable, otherwise there is a race between request body - // upload and response download - InputStream input = servletRequest.getInputStream(); - byte[] buffer = new byte[4096]; - while (true) - { - int read = input.read(buffer); - if (read < 0) - break; - } - request.setHandled(true); - } - }); - - final AtomicReference incomingData = new AtomicReference<>(""); - final AtomicReference outgoingData = new AtomicReference<>(""); - final CountDownLatch outgoingLatch = new CountDownLatch(1); - connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() - { - @Override - public void incoming(Socket socket, ByteBuffer bytes) - { - incomingData.set(incomingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - } - - @Override - public void outgoing(Socket socket, ByteBuffer bytes) - { - outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8)); - outgoingLatch.countDown(); - } - }); - int port = connector.getLocalPort(); - - // Generate 32 KiB of request content - String requestContent = "0123456789ABCDEF"; - for (int i = 0; i < 11; ++i) - { - requestContent += requestContent; - } - String request = - "POST / HTTP/1.1\r\n" + - "Host: localhost:" + port + "\r\n" + - "Content-Type: text/plain\r\n" + - "Content-Length: " + requestContent.length() + "\r\n" + - "\r\n" + - requestContent; - String expectedResponse = - "HTTP/1.1 200 OK\r\n" + - "Content-Length: 0\r\n" + - "\r\n"; - - Socket socket = new Socket("localhost", port); - OutputStream output = socket.getOutputStream(); - output.write(request.getBytes(StandardCharsets.UTF_8)); - output.flush(); - - assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS)); - assertEquals(expectedResponse, outgoingData.get()); - - byte[] responseBytes = readResponse(socket); - String response = new String(responseBytes, StandardCharsets.UTF_8); - assertEquals(expectedResponse, response); - - assertEquals(request, incomingData.get()); - - socket.close(); - } - - private byte[] readResponse(Socket socket) throws IOException - { - socket.setSoTimeout(5000); - InputStream input = socket.getInputStream(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - int read; - while ((read = input.read()) >= 0) - { - baos.write(read); - - // Handle non-chunked end of response - if (read == END_OF_CONTENT) - break; - - // Handle chunked end of response - String response = baos.toString("UTF-8"); - if (response.endsWith("\r\n0\r\n\r\n")) - break; - - // Handle non-content responses - if (response.contains("Content-Length: 0") && response.endsWith("\r\n\r\n")) - break; - } - return baos.toByteArray(); - } -}