Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-04-07 14:54:09 +02:00
commit e1be62c978
7 changed files with 590 additions and 606 deletions

View File

@ -0,0 +1,525 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.FormRequestContent;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.io.NetworkTrafficSocketChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.NetworkTrafficServerConnector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Fields;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class NetworkTrafficListenerTest
{
private static final String END_OF_CONTENT = "~";
private Server server;
private NetworkTrafficServerConnector connector;
private NetworkTrafficHttpClient client;
private void start(Handler handler) throws Exception
{
startServer(handler);
startClient();
}
private void startServer(Handler handler) throws Exception
{
server = new Server();
connector = new NetworkTrafficServerConnector(server);
connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendDateHeader(false);
connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendServerVersion(false);
server.addConnector(connector);
server.setHandler(handler);
server.start();
}
private void startClient() throws Exception
{
client = new NetworkTrafficHttpClient(new AtomicReference<>());
client.start();
}
@AfterEach
public void dispose() throws Exception
{
if (client != null)
client.stop();
if (server != null)
server.stop();
}
@Test
public void testOpenedClosedAreInvoked() throws Exception
{
startServer(null);
CountDownLatch openedLatch = new CountDownLatch(1);
CountDownLatch closedLatch = new CountDownLatch(1);
connector.setNetworkTrafficListener(new NetworkTrafficListener()
{
public volatile Socket socket;
@Override
public void opened(Socket socket)
{
this.socket = socket;
openedLatch.countDown();
}
@Override
public void closed(Socket socket)
{
if (this.socket == socket)
closedLatch.countDown();
}
});
int port = connector.getLocalPort();
// Connect to the server
try (Socket ignored = new Socket("localhost", port))
{
assertTrue(openedLatch.await(10, TimeUnit.SECONDS));
}
assertTrue(closedLatch.await(10, TimeUnit.SECONDS));
}
@Test
public void testTrafficWithNoResponseContentOnNonPersistentConnection() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse)
{
request.setHandled(true);
}
});
AtomicReference<String> serverIncoming = new AtomicReference<>("");
CountDownLatch serverIncomingLatch = new CountDownLatch(1);
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
connector.setNetworkTrafficListener(new NetworkTrafficListener()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
serverIncomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
serverOutgoingLatch.countDown();
}
});
AtomicReference<String> clientIncoming = new AtomicReference<>("");
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
CountDownLatch clientOutgoingLatch = new CountDownLatch(1);
client.listener.set(new NetworkTrafficListener()
{
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
clientOutgoingLatch.countDown();
}
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
clientIncomingLatch.countDown();
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS));
assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS));
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
assertEquals(clientOutgoing.get(), serverIncoming.get());
assertEquals(serverOutgoing.get(), clientIncoming.get());
}
@Test
public void testTrafficWithResponseContentOnPersistentConnection() throws Exception
{
String responseContent = "response_content" + END_OF_CONTENT;
start(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException
{
request.setHandled(true);
ServletOutputStream output = servletResponse.getOutputStream();
output.write(responseContent.getBytes(StandardCharsets.UTF_8));
}
});
AtomicReference<String> serverIncoming = new AtomicReference<>("");
CountDownLatch serverIncomingLatch = new CountDownLatch(1);
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
connector.setNetworkTrafficListener(new NetworkTrafficListener()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
serverIncomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
serverOutgoingLatch.countDown();
}
});
AtomicReference<String> clientIncoming = new AtomicReference<>("");
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
CountDownLatch clientOutgoingLatch = new CountDownLatch(1);
client.listener.set(new NetworkTrafficListener()
{
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
clientOutgoingLatch.countDown();
}
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
clientIncomingLatch.countDown();
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort()).send();
assertEquals(HttpStatus.OK_200, response.getStatus());
assertEquals(responseContent, response.getContentAsString());
assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS));
assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS));
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
assertEquals(clientOutgoing.get(), serverIncoming.get());
assertEquals(serverOutgoing.get(), clientIncoming.get());
}
@Test
public void testTrafficWithResponseContentChunkedOnPersistentConnection() throws Exception
{
String responseContent = "response_content";
String responseChunk1 = responseContent.substring(0, responseContent.length() / 2);
String responseChunk2 = responseContent.substring(responseContent.length() / 2);
start(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException
{
request.setHandled(true);
ServletOutputStream output = servletResponse.getOutputStream();
output.write(responseChunk1.getBytes(StandardCharsets.UTF_8));
output.flush();
output.write(responseChunk2.getBytes(StandardCharsets.UTF_8));
output.flush();
}
});
AtomicReference<String> serverIncoming = new AtomicReference<>("");
CountDownLatch serverIncomingLatch = new CountDownLatch(1);
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
connector.setNetworkTrafficListener(new NetworkTrafficListener()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
serverIncomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
if (serverOutgoing.get().endsWith("\r\n0\r\n\r\n"))
serverOutgoingLatch.countDown();
}
});
AtomicReference<String> clientIncoming = new AtomicReference<>("");
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
CountDownLatch clientOutgoingLatch = new CountDownLatch(1);
client.listener.set(new NetworkTrafficListener()
{
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
clientOutgoingLatch.countDown();
}
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
if (clientIncoming.get().endsWith("\r\n0\r\n\r\n"))
clientIncomingLatch.countDown();
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort()).send();
assertEquals(HttpStatus.OK_200, response.getStatus());
assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS));
assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS));
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
assertEquals(clientOutgoing.get(), serverIncoming.get());
assertEquals(serverOutgoing.get(), clientIncoming.get());
}
@Test
public void testTrafficWithRequestContentWithResponseRedirectOnPersistentConnection() throws Exception
{
String location = "/redirect";
start(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException
{
request.setHandled(true);
servletResponse.sendRedirect(location);
}
});
AtomicReference<String> serverIncoming = new AtomicReference<>("");
CountDownLatch serverIncomingLatch = new CountDownLatch(1);
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
connector.setNetworkTrafficListener(new NetworkTrafficListener()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
serverIncomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
serverOutgoingLatch.countDown();
}
});
AtomicReference<String> clientIncoming = new AtomicReference<>("");
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
CountDownLatch clientOutgoingLatch = new CountDownLatch(1);
client.listener.set(new NetworkTrafficListener()
{
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
clientOutgoingLatch.countDown();
}
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
clientIncomingLatch.countDown();
}
});
client.setFollowRedirects(false);
Fields fields = new Fields();
fields.put("a", "1");
fields.put("b", "2");
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.body(new FormRequestContent(fields))
.send();
assertEquals(HttpStatus.FOUND_302, response.getStatus());
assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS));
assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS));
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
assertEquals(clientOutgoing.get(), serverIncoming.get());
assertEquals(serverOutgoing.get(), clientIncoming.get());
}
@Test
public void testTrafficWithBigRequestContentOnPersistentConnection() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException
{
// Read and discard the request body to make the test more
// reliable, otherwise there is a race between request body
// upload and response download
InputStream input = servletRequest.getInputStream();
byte[] buffer = new byte[4096];
while (true)
{
int read = input.read(buffer);
if (read < 0)
break;
}
request.setHandled(true);
}
});
AtomicReference<String> serverIncoming = new AtomicReference<>("");
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
connector.setNetworkTrafficListener(new NetworkTrafficListener()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
serverOutgoingLatch.countDown();
}
});
AtomicReference<String> clientIncoming = new AtomicReference<>("");
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
client.listener.set(new NetworkTrafficListener()
{
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
}
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
clientIncomingLatch.countDown();
}
});
// Generate a large request content.
String requestContent = "0123456789ABCDEF";
for (int i = 0; i < 16; ++i)
{
requestContent += requestContent;
}
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.body(new StringRequestContent(requestContent))
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
assertEquals(clientOutgoing.get(), serverIncoming.get());
assertTrue(clientOutgoing.get().length() > requestContent.length());
assertEquals(serverOutgoing.get(), clientIncoming.get());
}
private static class NetworkTrafficHttpClient extends HttpClient
{
private final AtomicReference<NetworkTrafficListener> listener;
private NetworkTrafficHttpClient(AtomicReference<NetworkTrafficListener> listener)
{
super(new HttpClientTransportOverHTTP(new ClientConnector()
{
@Override
protected SelectorManager newSelectorManager()
{
return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors())
{
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
return new NetworkTrafficSocketChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout().toMillis(), listener.get());
}
};
}
}));
this.listener = listener;
}
}
}

View File

@ -301,7 +301,7 @@ public class ClientConnector extends ContainerLifeCycle
protected class ClientSelectorManager extends SelectorManager
{
protected ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
public ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
}

View File

@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
/**
* <p>A listener for raw network traffic within Jetty.</p>
* <p>{@link NetworkTrafficListener}s can be installed in a
* <code>org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector</code>,
* {@code org.eclipse.jetty.server.NetworkTrafficServerConnector},
* and are notified of the following network traffic events:</p>
* <ul>
* <li>Connection opened, when the server has accepted the connection from a remote client</li>
@ -45,7 +45,9 @@ public interface NetworkTrafficListener
*
* @param socket the socket associated with the remote client
*/
public void opened(Socket socket);
default void opened(Socket socket)
{
}
/**
* <p>Callback method invoked when bytes sent by a remote client arrived on the server.</p>
@ -53,7 +55,9 @@ public interface NetworkTrafficListener
* @param socket the socket associated with the remote client
* @param bytes the read-only buffer containing the incoming bytes
*/
public void incoming(Socket socket, ByteBuffer bytes);
default void incoming(Socket socket, ByteBuffer bytes)
{
}
/**
* <p>Callback method invoked when bytes are sent to a remote client from the server.</p>
@ -62,7 +66,9 @@ public interface NetworkTrafficListener
* @param socket the socket associated with the remote client
* @param bytes the read-only buffer containing the outgoing bytes
*/
public void outgoing(Socket socket, ByteBuffer bytes);
default void outgoing(Socket socket, ByteBuffer bytes)
{
}
/**
* <p>Callback method invoked when a connection to a remote client has been closed.</p>
@ -74,31 +80,7 @@ public interface NetworkTrafficListener
*
* @param socket the (closed) socket associated with the remote client
*/
public void closed(Socket socket);
/**
* <p>A commodity class that implements {@link NetworkTrafficListener} with empty methods.</p>
*/
public static class Adapter implements NetworkTrafficListener
default void closed(Socket socket)
{
@Override
public void opened(Socket socket)
{
}
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
}
@Override
public void closed(Socket socket)
{
}
}
}

View File

@ -19,27 +19,28 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NetworkTrafficSelectChannelEndPoint extends SocketChannelEndPoint
/**
* <p>A specialized version of {@link SocketChannelEndPoint} that supports {@link NetworkTrafficListener}s.</p>
*/
public class NetworkTrafficSocketChannelEndPoint extends SocketChannelEndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(NetworkTrafficSelectChannelEndPoint.class);
private static final Logger LOG = LoggerFactory.getLogger(NetworkTrafficSocketChannelEndPoint.class);
private final List<NetworkTrafficListener> listeners;
private final NetworkTrafficListener listener;
public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, List<NetworkTrafficListener> listeners) throws IOException
public NetworkTrafficSocketChannelEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, NetworkTrafficListener listener)
{
super(channel, selectSet, key, scheduler);
setIdleTimeout(idleTimeout);
this.listeners = listeners;
this.listener = listener;
}
@Override
@ -60,7 +61,7 @@ public class NetworkTrafficSelectChannelEndPoint extends SocketChannelEndPoint
{
int position = b.position();
ByteBuffer view = b.slice();
flushed &= super.flush(b);
flushed = super.flush(b);
int l = b.position() - position;
view.limit(view.position() + l);
notifyOutgoing(view);
@ -75,76 +76,63 @@ public class NetworkTrafficSelectChannelEndPoint extends SocketChannelEndPoint
public void onOpen()
{
super.onOpen();
if (listeners != null && !listeners.isEmpty())
if (listener != null)
{
for (NetworkTrafficListener listener : listeners)
try
{
try
{
listener.opened(getSocket());
}
catch (Exception x)
{
LOG.warn("listener.opened failure", x);
}
listener.opened(getSocket());
}
catch (Throwable x)
{
LOG.info("Exception while invoking listener " + listener, x);
}
}
}
@Override
public void onClose(Throwable cause)
public void onClose(Throwable failure)
{
super.onClose(cause);
if (listeners != null && !listeners.isEmpty())
super.onClose(failure);
if (listener != null)
{
for (NetworkTrafficListener listener : listeners)
try
{
try
{
listener.closed(getSocket());
}
catch (Exception x)
{
LOG.warn("listener.closed failure", x);
}
listener.closed(getSocket());
}
catch (Throwable x)
{
LOG.info("Exception while invoking listener " + listener, x);
}
}
}
public void notifyIncoming(ByteBuffer buffer, int read)
{
if (listeners != null && !listeners.isEmpty() && read > 0)
if (listener != null && read > 0)
{
for (NetworkTrafficListener listener : listeners)
try
{
try
{
ByteBuffer view = buffer.asReadOnlyBuffer();
listener.incoming(getSocket(), view);
}
catch (Exception x)
{
LOG.warn("listener.incoming() failure", x);
}
ByteBuffer view = buffer.asReadOnlyBuffer();
listener.incoming(getSocket(), view);
}
catch (Throwable x)
{
LOG.info("Exception while invoking listener " + listener, x);
}
}
}
public void notifyOutgoing(ByteBuffer view)
{
if (listeners != null && !listeners.isEmpty() && view.hasRemaining())
if (listener != null && view.hasRemaining())
{
Socket socket = getSocket();
for (NetworkTrafficListener listener : listeners)
try
{
try
{
listener.outgoing(socket, view);
}
catch (Exception x)
{
LOG.warn("listener.outgoing() failure", x);
}
listener.outgoing(getSocket(), view);
}
catch (Throwable x)
{
LOG.info("Exception while invoking listener " + listener, x);
}
}
}

View File

@ -18,29 +18,26 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint;
import org.eclipse.jetty.io.NetworkTrafficSocketChannelEndPoint;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>A specialized version of {@link ServerConnector} that supports {@link NetworkTrafficListener}s.</p>
* <p>{@link NetworkTrafficListener}s can be added and removed dynamically before and after this connector has
* been started without causing {@link java.util.ConcurrentModificationException}s.</p>
* <p>A {@link NetworkTrafficListener} can be set and unset dynamically before and after this connector has
* been started.</p>
*/
public class NetworkTrafficServerConnector extends ServerConnector
{
private final List<NetworkTrafficListener> listeners = new CopyOnWriteArrayList<>();
private volatile NetworkTrafficListener listener;
public NetworkTrafficServerConnector(Server server)
{
@ -68,25 +65,24 @@ public class NetworkTrafficServerConnector extends ServerConnector
}
/**
* @param listener the listener to add
* @param listener the listener to set, or null to unset
*/
public void addNetworkTrafficListener(NetworkTrafficListener listener)
public void setNetworkTrafficListener(NetworkTrafficListener listener)
{
listeners.add(listener);
this.listener = listener;
}
/**
* @param listener the listener to remove
* @return the listener
*/
public void removeNetworkTrafficListener(NetworkTrafficListener listener)
public NetworkTrafficListener getNetworkTrafficListener()
{
listeners.remove(listener);
return listener;
}
@Override
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key)
{
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners);
return endPoint;
return new NetworkTrafficSocketChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), getNetworkTrafficListener());
}
}

View File

@ -1,23 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
/**
* Jetty Server : Core Server Connector
*/
package org.eclipse.jetty.server.nio;

View File

@ -1,484 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class NetworkTrafficListenerTest
{
private static final byte END_OF_CONTENT = '~';
private Server server;
private NetworkTrafficServerConnector connector;
public void initConnector(Handler handler) throws Exception
{
server = new Server();
connector = new NetworkTrafficServerConnector(server);
connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendDateHeader(false);
connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendServerVersion(false);
server.addConnector(connector);
server.setHandler(handler);
server.start();
}
@AfterEach
public void destroyConnector() throws Exception
{
if (server != null)
{
server.stop();
server.join();
}
}
@Test
public void testOpenedClosedAreInvoked() throws Exception
{
initConnector(null);
final CountDownLatch openedLatch = new CountDownLatch(1);
final CountDownLatch closedLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
public volatile Socket socket;
@Override
public void opened(Socket socket)
{
this.socket = socket;
openedLatch.countDown();
}
@Override
public void closed(Socket socket)
{
if (this.socket == socket)
closedLatch.countDown();
}
});
int port = connector.getLocalPort();
// Connect to the server
Socket socket = new Socket("localhost", port);
assertTrue(openedLatch.await(10, TimeUnit.SECONDS));
socket.close();
assertTrue(closedLatch.await(10, TimeUnit.SECONDS));
}
@Test
public void testTrafficWithNoResponseContentOnNonPersistentConnection() throws Exception
{
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
request.setHandled(true);
}
});
final AtomicReference<String> incomingData = new AtomicReference<>();
final CountDownLatch incomingLatch = new CountDownLatch(1);
final AtomicReference<String> outgoingData = new AtomicReference<>("");
final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8));
incomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
outgoingLatch.countDown();
}
});
int port = connector.getLocalPort();
String request =
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"Connection: close\r\n" +
"\r\n";
String expectedResponse =
"HTTP/1.1 200 OK\r\n" +
"Connection: close\r\n" +
"\r\n";
Socket socket = new Socket("localhost", port);
OutputStream output = socket.getOutputStream();
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
assertEquals(request, incomingData.get());
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
assertEquals(expectedResponse, outgoingData.get());
byte[] responseBytes = readResponse(socket);
String response = new String(responseBytes, StandardCharsets.UTF_8);
assertEquals(expectedResponse, response);
socket.close();
}
@Test
public void testTrafficWithResponseContentOnPersistentConnection() throws Exception
{
final String responseContent = "response_content";
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
request.setHandled(true);
ServletOutputStream output = servletResponse.getOutputStream();
output.write(responseContent.getBytes(StandardCharsets.UTF_8));
output.write(END_OF_CONTENT);
}
});
final AtomicReference<String> incomingData = new AtomicReference<>();
final CountDownLatch incomingLatch = new CountDownLatch(1);
final AtomicReference<String> outgoingData = new AtomicReference<>("");
final CountDownLatch outgoingLatch = new CountDownLatch(2);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8));
incomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
outgoingLatch.countDown();
}
});
int port = connector.getLocalPort();
String request =
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"\r\n";
String expectedResponse =
"HTTP/1.1 200 OK\r\n" +
"Content-Length: " + (responseContent.length() + 1) + "\r\n" +
"\r\n" +
"" + responseContent + (char)END_OF_CONTENT;
Socket socket = new Socket("localhost", port);
OutputStream output = socket.getOutputStream();
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
assertEquals(request, incomingData.get());
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
assertEquals(expectedResponse, outgoingData.get());
byte[] responseBytes = readResponse(socket);
String response = new String(responseBytes, StandardCharsets.UTF_8);
assertEquals(expectedResponse, response);
socket.close();
}
@Test
public void testTrafficWithResponseContentChunkedOnPersistentConnection() throws Exception
{
final String responseContent = "response_content";
final String responseChunk1 = "response_content".substring(0, responseContent.length() / 2);
final String responseChunk2 = "response_content".substring(responseContent.length() / 2, responseContent.length());
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
request.setHandled(true);
ServletOutputStream output = servletResponse.getOutputStream();
output.write(responseChunk1.getBytes(StandardCharsets.UTF_8));
output.flush();
output.write(responseChunk2.getBytes(StandardCharsets.UTF_8));
output.flush();
}
});
final AtomicReference<String> incomingData = new AtomicReference<>();
final CountDownLatch incomingLatch = new CountDownLatch(1);
final AtomicReference<String> outgoingData = new AtomicReference<>("");
final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8));
incomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
if (outgoingData.get().endsWith("\r\n0\r\n\r\n"))
outgoingLatch.countDown();
}
});
int port = connector.getLocalPort();
String request =
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"\r\n";
String expectedResponse =
"HTTP/1.1 200 OK\r\n" +
"Transfer-Encoding: chunked\r\n" +
"\r\n" +
responseChunk1.length() + "\r\n" +
responseChunk1 + "\r\n" +
responseChunk2.length() + "\r\n" +
responseChunk2 + "\r\n" +
"0\r\n" +
"\r\n";
Socket socket = new Socket("localhost", port);
OutputStream output = socket.getOutputStream();
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
assertEquals(request, incomingData.get());
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
assertEquals(expectedResponse, outgoingData.get());
byte[] responseBytes = readResponse(socket);
String response = new String(responseBytes, StandardCharsets.UTF_8);
assertEquals(expectedResponse, response);
socket.close();
}
@Test
public void testTrafficWithRequestContentWithResponseRedirectOnPersistentConnection() throws Exception
{
final String location = "/redirect";
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
request.setHandled(true);
servletResponse.sendRedirect(location);
}
});
final AtomicReference<String> incomingData = new AtomicReference<>();
final CountDownLatch incomingLatch = new CountDownLatch(1);
final AtomicReference<String> outgoingData = new AtomicReference<>("");
final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8));
incomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
outgoingLatch.countDown();
}
});
int port = connector.getLocalPort();
String requestContent = "a=1&b=2";
String request =
"POST / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"Content-Type: application/x-www-form-urlencoded\r\n" +
"Content-Length: " + requestContent.length() + "\r\n" +
"\r\n" +
requestContent;
String expectedResponse =
"HTTP/1.1 302 Found\r\n" +
"Location: http://localhost:" + port + location + "\r\n" +
"Content-Length: 0\r\n" +
"\r\n";
Socket socket = new Socket("localhost", port);
OutputStream output = socket.getOutputStream();
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
assertEquals(request, incomingData.get());
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
assertEquals(expectedResponse, outgoingData.get());
byte[] responseBytes = readResponse(socket);
String response = new String(responseBytes, StandardCharsets.UTF_8);
assertEquals(expectedResponse, response);
socket.close();
}
@Test
public void testTrafficWithBigRequestContentOnPersistentConnection() throws Exception
{
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
// Read and discard the request body to make the test more
// reliable, otherwise there is a race between request body
// upload and response download
InputStream input = servletRequest.getInputStream();
byte[] buffer = new byte[4096];
while (true)
{
int read = input.read(buffer);
if (read < 0)
break;
}
request.setHandled(true);
}
});
final AtomicReference<String> incomingData = new AtomicReference<>("");
final AtomicReference<String> outgoingData = new AtomicReference<>("");
final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
incomingData.set(incomingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
outgoingLatch.countDown();
}
});
int port = connector.getLocalPort();
// Generate 32 KiB of request content
String requestContent = "0123456789ABCDEF";
for (int i = 0; i < 11; ++i)
{
requestContent += requestContent;
}
String request =
"POST / HTTP/1.1\r\n" +
"Host: localhost:" + port + "\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Length: " + requestContent.length() + "\r\n" +
"\r\n" +
requestContent;
String expectedResponse =
"HTTP/1.1 200 OK\r\n" +
"Content-Length: 0\r\n" +
"\r\n";
Socket socket = new Socket("localhost", port);
OutputStream output = socket.getOutputStream();
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
assertEquals(expectedResponse, outgoingData.get());
byte[] responseBytes = readResponse(socket);
String response = new String(responseBytes, StandardCharsets.UTF_8);
assertEquals(expectedResponse, response);
assertEquals(request, incomingData.get());
socket.close();
}
private byte[] readResponse(Socket socket) throws IOException
{
socket.setSoTimeout(5000);
InputStream input = socket.getInputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int read;
while ((read = input.read()) >= 0)
{
baos.write(read);
// Handle non-chunked end of response
if (read == END_OF_CONTENT)
break;
// Handle chunked end of response
String response = baos.toString("UTF-8");
if (response.endsWith("\r\n0\r\n\r\n"))
break;
// Handle non-content responses
if (response.contains("Content-Length: 0") && response.endsWith("\r\n\r\n"))
break;
}
return baos.toByteArray();
}
}