Merge pull request #4754 from eclipse/jetty-9.4.x-4751-refresh_networktraffic_classes
Fixes #4751 - Refresh NetworkTraffic* classes.
This commit is contained in:
commit
103c22dc43
|
@ -0,0 +1,526 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.SelectableChannel;
|
||||||
|
import java.nio.channels.SelectionKey;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import javax.servlet.ServletOutputStream;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.client.api.ContentResponse;
|
||||||
|
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||||
|
import org.eclipse.jetty.client.util.FormContentProvider;
|
||||||
|
import org.eclipse.jetty.client.util.StringContentProvider;
|
||||||
|
import org.eclipse.jetty.http.HttpHeader;
|
||||||
|
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||||
|
import org.eclipse.jetty.http.HttpStatus;
|
||||||
|
import org.eclipse.jetty.io.EndPoint;
|
||||||
|
import org.eclipse.jetty.io.ManagedSelector;
|
||||||
|
import org.eclipse.jetty.io.NetworkTrafficListener;
|
||||||
|
import org.eclipse.jetty.io.NetworkTrafficSocketChannelEndPoint;
|
||||||
|
import org.eclipse.jetty.io.SelectorManager;
|
||||||
|
import org.eclipse.jetty.server.Handler;
|
||||||
|
import org.eclipse.jetty.server.HttpConfiguration;
|
||||||
|
import org.eclipse.jetty.server.NetworkTrafficServerConnector;
|
||||||
|
import org.eclipse.jetty.server.Request;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||||
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
|
import org.eclipse.jetty.util.Fields;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class NetworkTrafficListenerTest
|
||||||
|
{
|
||||||
|
private static final String END_OF_CONTENT = "~";
|
||||||
|
|
||||||
|
private Server server;
|
||||||
|
private NetworkTrafficServerConnector connector;
|
||||||
|
private NetworkTrafficHttpClient client;
|
||||||
|
|
||||||
|
private void start(Handler handler) throws Exception
|
||||||
|
{
|
||||||
|
startServer(handler);
|
||||||
|
startClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startServer(Handler handler) throws Exception
|
||||||
|
{
|
||||||
|
server = new Server();
|
||||||
|
connector = new NetworkTrafficServerConnector(server);
|
||||||
|
connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendDateHeader(false);
|
||||||
|
connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendServerVersion(false);
|
||||||
|
server.addConnector(connector);
|
||||||
|
server.setHandler(handler);
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startClient() throws Exception
|
||||||
|
{
|
||||||
|
client = new NetworkTrafficHttpClient(new ArrayList<>());
|
||||||
|
client.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void dispose() throws Exception
|
||||||
|
{
|
||||||
|
if (client != null)
|
||||||
|
client.stop();
|
||||||
|
if (server != null)
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOpenedClosedAreInvoked() throws Exception
|
||||||
|
{
|
||||||
|
startServer(null);
|
||||||
|
|
||||||
|
CountDownLatch openedLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch closedLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
public volatile Socket socket;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void opened(Socket socket)
|
||||||
|
{
|
||||||
|
this.socket = socket;
|
||||||
|
openedLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void closed(Socket socket)
|
||||||
|
{
|
||||||
|
if (this.socket == socket)
|
||||||
|
closedLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
|
||||||
|
// Connect to the server
|
||||||
|
try (Socket ignored = new Socket("localhost", port))
|
||||||
|
{
|
||||||
|
assertTrue(openedLatch.await(10, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
assertTrue(closedLatch.await(10, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithNoResponseContentOnNonPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
start(new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse)
|
||||||
|
{
|
||||||
|
request.setHandled(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> serverIncoming = new AtomicReference<>("");
|
||||||
|
CountDownLatch serverIncomingLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
|
||||||
|
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
serverIncomingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
serverOutgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> clientIncoming = new AtomicReference<>("");
|
||||||
|
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
|
||||||
|
CountDownLatch clientOutgoingLatch = new CountDownLatch(1);
|
||||||
|
client.listeners.add(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
clientOutgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
clientIncomingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
|
||||||
|
.send();
|
||||||
|
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||||
|
|
||||||
|
assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(clientOutgoing.get(), serverIncoming.get());
|
||||||
|
assertEquals(serverOutgoing.get(), clientIncoming.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithResponseContentOnPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
String responseContent = "response_content" + END_OF_CONTENT;
|
||||||
|
start(new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException
|
||||||
|
{
|
||||||
|
request.setHandled(true);
|
||||||
|
ServletOutputStream output = servletResponse.getOutputStream();
|
||||||
|
output.write(responseContent.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> serverIncoming = new AtomicReference<>("");
|
||||||
|
CountDownLatch serverIncomingLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
|
||||||
|
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
serverIncomingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
serverOutgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> clientIncoming = new AtomicReference<>("");
|
||||||
|
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
|
||||||
|
CountDownLatch clientOutgoingLatch = new CountDownLatch(1);
|
||||||
|
client.listeners.add(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
clientOutgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
clientIncomingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort()).send();
|
||||||
|
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||||
|
assertEquals(responseContent, response.getContentAsString());
|
||||||
|
|
||||||
|
assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(clientOutgoing.get(), serverIncoming.get());
|
||||||
|
assertEquals(serverOutgoing.get(), clientIncoming.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithResponseContentChunkedOnPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
String responseContent = "response_content";
|
||||||
|
String responseChunk1 = responseContent.substring(0, responseContent.length() / 2);
|
||||||
|
String responseChunk2 = responseContent.substring(responseContent.length() / 2);
|
||||||
|
start(new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException
|
||||||
|
{
|
||||||
|
request.setHandled(true);
|
||||||
|
ServletOutputStream output = servletResponse.getOutputStream();
|
||||||
|
output.write(responseChunk1.getBytes(StandardCharsets.UTF_8));
|
||||||
|
output.flush();
|
||||||
|
output.write(responseChunk2.getBytes(StandardCharsets.UTF_8));
|
||||||
|
output.flush();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> serverIncoming = new AtomicReference<>("");
|
||||||
|
CountDownLatch serverIncomingLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
|
||||||
|
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
serverIncomingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
if (serverOutgoing.get().endsWith("\r\n0\r\n\r\n"))
|
||||||
|
serverOutgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> clientIncoming = new AtomicReference<>("");
|
||||||
|
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
|
||||||
|
CountDownLatch clientOutgoingLatch = new CountDownLatch(1);
|
||||||
|
client.listeners.add(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
clientOutgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
if (clientIncoming.get().endsWith("\r\n0\r\n\r\n"))
|
||||||
|
clientIncomingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort()).send();
|
||||||
|
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||||
|
|
||||||
|
assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(clientOutgoing.get(), serverIncoming.get());
|
||||||
|
assertEquals(serverOutgoing.get(), clientIncoming.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithRequestContentWithResponseRedirectOnPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
String location = "/redirect";
|
||||||
|
start(new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException
|
||||||
|
{
|
||||||
|
request.setHandled(true);
|
||||||
|
servletResponse.sendRedirect(location);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> serverIncoming = new AtomicReference<>("");
|
||||||
|
CountDownLatch serverIncomingLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
|
||||||
|
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
serverIncomingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
serverOutgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> clientIncoming = new AtomicReference<>("");
|
||||||
|
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
|
||||||
|
CountDownLatch clientOutgoingLatch = new CountDownLatch(1);
|
||||||
|
client.listeners.add(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
clientOutgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
clientIncomingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
client.setFollowRedirects(false);
|
||||||
|
Fields fields = new Fields();
|
||||||
|
fields.put("a", "1");
|
||||||
|
fields.put("b", "2");
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.content(new FormContentProvider(fields))
|
||||||
|
.send();
|
||||||
|
assertEquals(HttpStatus.FOUND_302, response.getStatus());
|
||||||
|
|
||||||
|
assertTrue(clientOutgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverIncomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(clientOutgoing.get(), serverIncoming.get());
|
||||||
|
assertEquals(serverOutgoing.get(), clientIncoming.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithBigRequestContentOnPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
start(new AbstractHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException
|
||||||
|
{
|
||||||
|
// Read and discard the request body to make the test more
|
||||||
|
// reliable, otherwise there is a race between request body
|
||||||
|
// upload and response download
|
||||||
|
InputStream input = servletRequest.getInputStream();
|
||||||
|
byte[] buffer = new byte[4096];
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
int read = input.read(buffer);
|
||||||
|
if (read < 0)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
request.setHandled(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> serverIncoming = new AtomicReference<>("");
|
||||||
|
AtomicReference<String> serverOutgoing = new AtomicReference<>("");
|
||||||
|
CountDownLatch serverOutgoingLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverIncoming.set(serverIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
serverOutgoing.set(serverOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
serverOutgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AtomicReference<String> clientIncoming = new AtomicReference<>("");
|
||||||
|
CountDownLatch clientIncomingLatch = new CountDownLatch(1);
|
||||||
|
AtomicReference<String> clientOutgoing = new AtomicReference<>("");
|
||||||
|
client.listeners.add(new NetworkTrafficListener()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientOutgoing.set(clientOutgoing.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
clientIncoming.set(clientIncoming.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
||||||
|
clientIncomingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Generate a large request content.
|
||||||
|
String requestContent = "0123456789ABCDEF";
|
||||||
|
for (int i = 0; i < 16; ++i)
|
||||||
|
{
|
||||||
|
requestContent += requestContent;
|
||||||
|
}
|
||||||
|
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.content(new StringContentProvider(requestContent))
|
||||||
|
.send();
|
||||||
|
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||||
|
|
||||||
|
assertTrue(serverOutgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertTrue(clientIncomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(clientOutgoing.get(), serverIncoming.get());
|
||||||
|
assertTrue(clientOutgoing.get().length() > requestContent.length());
|
||||||
|
assertEquals(serverOutgoing.get(), clientIncoming.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class NetworkTrafficHttpClient extends HttpClient
|
||||||
|
{
|
||||||
|
private final List<NetworkTrafficListener> listeners;
|
||||||
|
|
||||||
|
private NetworkTrafficHttpClient(List<NetworkTrafficListener> listeners)
|
||||||
|
{
|
||||||
|
super(new HttpClientTransportOverHTTP()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected SelectorManager newSelectorManager(HttpClient client)
|
||||||
|
{
|
||||||
|
return new AbstractConnectorHttpClientTransport.ClientSelectorManager(client, getSelectors())
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
|
||||||
|
{
|
||||||
|
return new NetworkTrafficSocketChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout(), listeners);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}, null);
|
||||||
|
this.listeners = listeners;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
|
||||||
/**
|
/**
|
||||||
* <p>A listener for raw network traffic within Jetty.</p>
|
* <p>A listener for raw network traffic within Jetty.</p>
|
||||||
* <p>{@link NetworkTrafficListener}s can be installed in a
|
* <p>{@link NetworkTrafficListener}s can be installed in a
|
||||||
* <code>org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector</code>,
|
* {@code org.eclipse.jetty.server.NetworkTrafficServerConnector},
|
||||||
* and are notified of the following network traffic events:</p>
|
* and are notified of the following network traffic events:</p>
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>Connection opened, when the server has accepted the connection from a remote client</li>
|
* <li>Connection opened, when the server has accepted the connection from a remote client</li>
|
||||||
|
@ -45,7 +45,9 @@ public interface NetworkTrafficListener
|
||||||
*
|
*
|
||||||
* @param socket the socket associated with the remote client
|
* @param socket the socket associated with the remote client
|
||||||
*/
|
*/
|
||||||
void opened(Socket socket);
|
default void opened(Socket socket)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Callback method invoked when bytes sent by a remote client arrived on the server.</p>
|
* <p>Callback method invoked when bytes sent by a remote client arrived on the server.</p>
|
||||||
|
@ -53,7 +55,9 @@ public interface NetworkTrafficListener
|
||||||
* @param socket the socket associated with the remote client
|
* @param socket the socket associated with the remote client
|
||||||
* @param bytes the read-only buffer containing the incoming bytes
|
* @param bytes the read-only buffer containing the incoming bytes
|
||||||
*/
|
*/
|
||||||
void incoming(Socket socket, ByteBuffer bytes);
|
default void incoming(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Callback method invoked when bytes are sent to a remote client from the server.</p>
|
* <p>Callback method invoked when bytes are sent to a remote client from the server.</p>
|
||||||
|
@ -62,7 +66,9 @@ public interface NetworkTrafficListener
|
||||||
* @param socket the socket associated with the remote client
|
* @param socket the socket associated with the remote client
|
||||||
* @param bytes the read-only buffer containing the outgoing bytes
|
* @param bytes the read-only buffer containing the outgoing bytes
|
||||||
*/
|
*/
|
||||||
void outgoing(Socket socket, ByteBuffer bytes);
|
default void outgoing(Socket socket, ByteBuffer bytes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Callback method invoked when a connection to a remote client has been closed.</p>
|
* <p>Callback method invoked when a connection to a remote client has been closed.</p>
|
||||||
|
@ -74,11 +80,15 @@ public interface NetworkTrafficListener
|
||||||
*
|
*
|
||||||
* @param socket the (closed) socket associated with the remote client
|
* @param socket the (closed) socket associated with the remote client
|
||||||
*/
|
*/
|
||||||
void closed(Socket socket);
|
default void closed(Socket socket)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>A commodity class that implements {@link NetworkTrafficListener} with empty methods.</p>
|
* <p>A commodity class that implements {@link NetworkTrafficListener} with empty methods.</p>
|
||||||
|
* @deprecated use {@link NetworkTrafficListener} instead
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
class Adapter implements NetworkTrafficListener
|
class Adapter implements NetworkTrafficListener
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,133 +18,20 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.io;
|
package org.eclipse.jetty.io;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.Socket;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.channels.SelectionKey;
|
import java.nio.channels.SelectionKey;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.log.Log;
|
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
|
||||||
public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
|
/**
|
||||||
|
* @deprecated use {@link NetworkTrafficSocketChannelEndPoint} instead
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public class NetworkTrafficSelectChannelEndPoint extends NetworkTrafficSocketChannelEndPoint
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(NetworkTrafficSelectChannelEndPoint.class);
|
public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, List<NetworkTrafficListener> listeners)
|
||||||
|
|
||||||
private final List<NetworkTrafficListener> listeners;
|
|
||||||
|
|
||||||
public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, List<NetworkTrafficListener> listeners) throws IOException
|
|
||||||
{
|
{
|
||||||
super(channel, selectSet, key, scheduler, idleTimeout);
|
super(channel, selectSet, key, scheduler, idleTimeout, listeners);
|
||||||
this.listeners = listeners;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int fill(ByteBuffer buffer) throws IOException
|
|
||||||
{
|
|
||||||
int read = super.fill(buffer);
|
|
||||||
notifyIncoming(buffer, read);
|
|
||||||
return read;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean flush(ByteBuffer... buffers) throws IOException
|
|
||||||
{
|
|
||||||
boolean flushed = true;
|
|
||||||
for (ByteBuffer b : buffers)
|
|
||||||
{
|
|
||||||
if (b.hasRemaining())
|
|
||||||
{
|
|
||||||
int position = b.position();
|
|
||||||
ByteBuffer view = b.slice();
|
|
||||||
flushed &= super.flush(b);
|
|
||||||
int l = b.position() - position;
|
|
||||||
view.limit(view.position() + l);
|
|
||||||
notifyOutgoing(view);
|
|
||||||
if (!flushed)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return flushed;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onOpen()
|
|
||||||
{
|
|
||||||
super.onOpen();
|
|
||||||
if (listeners != null && !listeners.isEmpty())
|
|
||||||
{
|
|
||||||
for (NetworkTrafficListener listener : listeners)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
listener.opened(getSocket());
|
|
||||||
}
|
|
||||||
catch (Exception x)
|
|
||||||
{
|
|
||||||
LOG.warn(x);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onClose()
|
|
||||||
{
|
|
||||||
super.onClose();
|
|
||||||
if (listeners != null && !listeners.isEmpty())
|
|
||||||
{
|
|
||||||
for (NetworkTrafficListener listener : listeners)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
listener.closed(getSocket());
|
|
||||||
}
|
|
||||||
catch (Exception x)
|
|
||||||
{
|
|
||||||
LOG.warn(x);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void notifyIncoming(ByteBuffer buffer, int read)
|
|
||||||
{
|
|
||||||
if (listeners != null && !listeners.isEmpty() && read > 0)
|
|
||||||
{
|
|
||||||
for (NetworkTrafficListener listener : listeners)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
ByteBuffer view = buffer.asReadOnlyBuffer();
|
|
||||||
listener.incoming(getSocket(), view);
|
|
||||||
}
|
|
||||||
catch (Exception x)
|
|
||||||
{
|
|
||||||
LOG.warn(x);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void notifyOutgoing(ByteBuffer view)
|
|
||||||
{
|
|
||||||
if (listeners != null && !listeners.isEmpty() && view.hasRemaining())
|
|
||||||
{
|
|
||||||
Socket socket = getSocket();
|
|
||||||
for (NetworkTrafficListener listener : listeners)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
listener.outgoing(socket, view);
|
|
||||||
}
|
|
||||||
catch (Exception x)
|
|
||||||
{
|
|
||||||
LOG.warn(x);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,154 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.io;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.SelectableChannel;
|
||||||
|
import java.nio.channels.SelectionKey;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>A specialized version of {@link SocketChannelEndPoint} that supports {@link NetworkTrafficListener}s.</p>
|
||||||
|
*/
|
||||||
|
public class NetworkTrafficSocketChannelEndPoint extends SocketChannelEndPoint
|
||||||
|
{
|
||||||
|
private static final Logger LOG = Log.getLogger(NetworkTrafficSocketChannelEndPoint.class);
|
||||||
|
|
||||||
|
private final List<NetworkTrafficListener> listeners;
|
||||||
|
|
||||||
|
public NetworkTrafficSocketChannelEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey key, Scheduler scheduler, long idleTimeout, List<NetworkTrafficListener> listeners)
|
||||||
|
{
|
||||||
|
super(channel, selectSet, key, scheduler);
|
||||||
|
setIdleTimeout(idleTimeout);
|
||||||
|
this.listeners = listeners;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int fill(ByteBuffer buffer) throws IOException
|
||||||
|
{
|
||||||
|
int read = super.fill(buffer);
|
||||||
|
notifyIncoming(buffer, read);
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean flush(ByteBuffer... buffers) throws IOException
|
||||||
|
{
|
||||||
|
boolean flushed = true;
|
||||||
|
for (ByteBuffer b : buffers)
|
||||||
|
{
|
||||||
|
if (b.hasRemaining())
|
||||||
|
{
|
||||||
|
int position = b.position();
|
||||||
|
ByteBuffer view = b.slice();
|
||||||
|
flushed = super.flush(b);
|
||||||
|
int l = b.position() - position;
|
||||||
|
view.limit(view.position() + l);
|
||||||
|
notifyOutgoing(view);
|
||||||
|
if (!flushed)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return flushed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onOpen()
|
||||||
|
{
|
||||||
|
super.onOpen();
|
||||||
|
if (listeners != null && !listeners.isEmpty())
|
||||||
|
{
|
||||||
|
for (NetworkTrafficListener listener : listeners)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
listener.opened(getSocket());
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
LOG.warn(x);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose()
|
||||||
|
{
|
||||||
|
super.onClose();
|
||||||
|
if (listeners != null && !listeners.isEmpty())
|
||||||
|
{
|
||||||
|
for (NetworkTrafficListener listener : listeners)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
listener.closed(getSocket());
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
LOG.warn(x);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notifyIncoming(ByteBuffer buffer, int read)
|
||||||
|
{
|
||||||
|
if (listeners != null && !listeners.isEmpty() && read > 0)
|
||||||
|
{
|
||||||
|
for (NetworkTrafficListener listener : listeners)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ByteBuffer view = buffer.asReadOnlyBuffer();
|
||||||
|
listener.incoming(getSocket(), view);
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
LOG.warn(x);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notifyOutgoing(ByteBuffer view)
|
||||||
|
{
|
||||||
|
if (listeners != null && !listeners.isEmpty() && view.hasRemaining())
|
||||||
|
{
|
||||||
|
Socket socket = getSocket();
|
||||||
|
for (NetworkTrafficListener listener : listeners)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
listener.outgoing(socket, view);
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
LOG.warn(x);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.server;
|
package org.eclipse.jetty.server;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.channels.SelectionKey;
|
import java.nio.channels.SelectionKey;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -29,7 +28,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
|
||||||
import org.eclipse.jetty.io.ChannelEndPoint;
|
import org.eclipse.jetty.io.ChannelEndPoint;
|
||||||
import org.eclipse.jetty.io.ManagedSelector;
|
import org.eclipse.jetty.io.ManagedSelector;
|
||||||
import org.eclipse.jetty.io.NetworkTrafficListener;
|
import org.eclipse.jetty.io.NetworkTrafficListener;
|
||||||
import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint;
|
import org.eclipse.jetty.io.NetworkTrafficSocketChannelEndPoint;
|
||||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
|
||||||
|
@ -84,9 +83,8 @@ public class NetworkTrafficServerConnector extends ServerConnector
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
|
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key)
|
||||||
{
|
{
|
||||||
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners);
|
return new NetworkTrafficSocketChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners);
|
||||||
return endPoint;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,484 +0,0 @@
|
||||||
//
|
|
||||||
// ========================================================================
|
|
||||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
|
||||||
// ------------------------------------------------------------------------
|
|
||||||
// All rights reserved. This program and the accompanying materials
|
|
||||||
// are made available under the terms of the Eclipse Public License v1.0
|
|
||||||
// and Apache License v2.0 which accompanies this distribution.
|
|
||||||
//
|
|
||||||
// The Eclipse Public License is available at
|
|
||||||
// http://www.eclipse.org/legal/epl-v10.html
|
|
||||||
//
|
|
||||||
// The Apache License v2.0 is available at
|
|
||||||
// http://www.opensource.org/licenses/apache2.0.php
|
|
||||||
//
|
|
||||||
// You may elect to redistribute this code under either of these licenses.
|
|
||||||
// ========================================================================
|
|
||||||
//
|
|
||||||
|
|
||||||
package org.eclipse.jetty.server;
|
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.Socket;
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import javax.servlet.ServletException;
|
|
||||||
import javax.servlet.ServletOutputStream;
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
|
||||||
import javax.servlet.http.HttpServletResponse;
|
|
||||||
|
|
||||||
import org.eclipse.jetty.io.NetworkTrafficListener;
|
|
||||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
|
||||||
import org.junit.jupiter.api.AfterEach;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
||||||
|
|
||||||
public class NetworkTrafficListenerTest
|
|
||||||
{
|
|
||||||
private static final byte END_OF_CONTENT = '~';
|
|
||||||
|
|
||||||
private Server server;
|
|
||||||
private NetworkTrafficServerConnector connector;
|
|
||||||
|
|
||||||
public void initConnector(Handler handler) throws Exception
|
|
||||||
{
|
|
||||||
server = new Server();
|
|
||||||
|
|
||||||
connector = new NetworkTrafficServerConnector(server);
|
|
||||||
connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendDateHeader(false);
|
|
||||||
connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendServerVersion(false);
|
|
||||||
server.addConnector(connector);
|
|
||||||
server.setHandler(handler);
|
|
||||||
server.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterEach
|
|
||||||
public void destroyConnector() throws Exception
|
|
||||||
{
|
|
||||||
if (server != null)
|
|
||||||
{
|
|
||||||
server.stop();
|
|
||||||
server.join();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testOpenedClosedAreInvoked() throws Exception
|
|
||||||
{
|
|
||||||
initConnector(null);
|
|
||||||
|
|
||||||
final CountDownLatch openedLatch = new CountDownLatch(1);
|
|
||||||
final CountDownLatch closedLatch = new CountDownLatch(1);
|
|
||||||
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
|
|
||||||
{
|
|
||||||
public volatile Socket socket;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void opened(Socket socket)
|
|
||||||
{
|
|
||||||
this.socket = socket;
|
|
||||||
openedLatch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void closed(Socket socket)
|
|
||||||
{
|
|
||||||
if (this.socket == socket)
|
|
||||||
closedLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
int port = connector.getLocalPort();
|
|
||||||
|
|
||||||
// Connect to the server
|
|
||||||
Socket socket = new Socket("localhost", port);
|
|
||||||
assertTrue(openedLatch.await(10, TimeUnit.SECONDS));
|
|
||||||
|
|
||||||
socket.close();
|
|
||||||
assertTrue(closedLatch.await(10, TimeUnit.SECONDS));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTrafficWithNoResponseContentOnNonPersistentConnection() throws Exception
|
|
||||||
{
|
|
||||||
initConnector(new AbstractHandler()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
|
||||||
{
|
|
||||||
request.setHandled(true);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
final AtomicReference<String> incomingData = new AtomicReference<>();
|
|
||||||
final CountDownLatch incomingLatch = new CountDownLatch(1);
|
|
||||||
final AtomicReference<String> outgoingData = new AtomicReference<>("");
|
|
||||||
final CountDownLatch outgoingLatch = new CountDownLatch(1);
|
|
||||||
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void incoming(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
incomingLatch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void outgoing(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
outgoingLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
int port = connector.getLocalPort();
|
|
||||||
|
|
||||||
String request =
|
|
||||||
"GET / HTTP/1.1\r\n" +
|
|
||||||
"Host: localhost:" + port + "\r\n" +
|
|
||||||
"Connection: close\r\n" +
|
|
||||||
"\r\n";
|
|
||||||
String expectedResponse =
|
|
||||||
"HTTP/1.1 200 OK\r\n" +
|
|
||||||
"Connection: close\r\n" +
|
|
||||||
"\r\n";
|
|
||||||
|
|
||||||
Socket socket = new Socket("localhost", port);
|
|
||||||
OutputStream output = socket.getOutputStream();
|
|
||||||
output.write(request.getBytes(StandardCharsets.UTF_8));
|
|
||||||
output.flush();
|
|
||||||
|
|
||||||
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
|
|
||||||
assertEquals(request, incomingData.get());
|
|
||||||
|
|
||||||
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
|
||||||
assertEquals(expectedResponse, outgoingData.get());
|
|
||||||
|
|
||||||
byte[] responseBytes = readResponse(socket);
|
|
||||||
String response = new String(responseBytes, StandardCharsets.UTF_8);
|
|
||||||
assertEquals(expectedResponse, response);
|
|
||||||
|
|
||||||
socket.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTrafficWithResponseContentOnPersistentConnection() throws Exception
|
|
||||||
{
|
|
||||||
final String responseContent = "response_content";
|
|
||||||
initConnector(new AbstractHandler()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
|
||||||
{
|
|
||||||
request.setHandled(true);
|
|
||||||
ServletOutputStream output = servletResponse.getOutputStream();
|
|
||||||
output.write(responseContent.getBytes(StandardCharsets.UTF_8));
|
|
||||||
output.write(END_OF_CONTENT);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
final AtomicReference<String> incomingData = new AtomicReference<>();
|
|
||||||
final CountDownLatch incomingLatch = new CountDownLatch(1);
|
|
||||||
final AtomicReference<String> outgoingData = new AtomicReference<>("");
|
|
||||||
final CountDownLatch outgoingLatch = new CountDownLatch(2);
|
|
||||||
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void incoming(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
incomingLatch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void outgoing(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
outgoingLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
int port = connector.getLocalPort();
|
|
||||||
|
|
||||||
String request =
|
|
||||||
"GET / HTTP/1.1\r\n" +
|
|
||||||
"Host: localhost:" + port + "\r\n" +
|
|
||||||
"\r\n";
|
|
||||||
String expectedResponse =
|
|
||||||
"HTTP/1.1 200 OK\r\n" +
|
|
||||||
"Content-Length: " + (responseContent.length() + 1) + "\r\n" +
|
|
||||||
"\r\n" +
|
|
||||||
"" + responseContent + (char)END_OF_CONTENT;
|
|
||||||
|
|
||||||
Socket socket = new Socket("localhost", port);
|
|
||||||
OutputStream output = socket.getOutputStream();
|
|
||||||
output.write(request.getBytes(StandardCharsets.UTF_8));
|
|
||||||
output.flush();
|
|
||||||
|
|
||||||
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
|
|
||||||
assertEquals(request, incomingData.get());
|
|
||||||
|
|
||||||
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
|
||||||
assertEquals(expectedResponse, outgoingData.get());
|
|
||||||
|
|
||||||
byte[] responseBytes = readResponse(socket);
|
|
||||||
String response = new String(responseBytes, StandardCharsets.UTF_8);
|
|
||||||
assertEquals(expectedResponse, response);
|
|
||||||
|
|
||||||
socket.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTrafficWithResponseContentChunkedOnPersistentConnection() throws Exception
|
|
||||||
{
|
|
||||||
final String responseContent = "response_content";
|
|
||||||
final String responseChunk1 = "response_content".substring(0, responseContent.length() / 2);
|
|
||||||
final String responseChunk2 = "response_content".substring(responseContent.length() / 2, responseContent.length());
|
|
||||||
initConnector(new AbstractHandler()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
|
||||||
{
|
|
||||||
request.setHandled(true);
|
|
||||||
ServletOutputStream output = servletResponse.getOutputStream();
|
|
||||||
output.write(responseChunk1.getBytes(StandardCharsets.UTF_8));
|
|
||||||
output.flush();
|
|
||||||
output.write(responseChunk2.getBytes(StandardCharsets.UTF_8));
|
|
||||||
output.flush();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
final AtomicReference<String> incomingData = new AtomicReference<>();
|
|
||||||
final CountDownLatch incomingLatch = new CountDownLatch(1);
|
|
||||||
final AtomicReference<String> outgoingData = new AtomicReference<>("");
|
|
||||||
final CountDownLatch outgoingLatch = new CountDownLatch(1);
|
|
||||||
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void incoming(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
incomingLatch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void outgoing(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
if (outgoingData.get().endsWith("\r\n0\r\n\r\n"))
|
|
||||||
outgoingLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
int port = connector.getLocalPort();
|
|
||||||
|
|
||||||
String request =
|
|
||||||
"GET / HTTP/1.1\r\n" +
|
|
||||||
"Host: localhost:" + port + "\r\n" +
|
|
||||||
"\r\n";
|
|
||||||
String expectedResponse =
|
|
||||||
"HTTP/1.1 200 OK\r\n" +
|
|
||||||
"Transfer-Encoding: chunked\r\n" +
|
|
||||||
"\r\n" +
|
|
||||||
responseChunk1.length() + "\r\n" +
|
|
||||||
responseChunk1 + "\r\n" +
|
|
||||||
responseChunk2.length() + "\r\n" +
|
|
||||||
responseChunk2 + "\r\n" +
|
|
||||||
"0\r\n" +
|
|
||||||
"\r\n";
|
|
||||||
|
|
||||||
Socket socket = new Socket("localhost", port);
|
|
||||||
OutputStream output = socket.getOutputStream();
|
|
||||||
output.write(request.getBytes(StandardCharsets.UTF_8));
|
|
||||||
output.flush();
|
|
||||||
|
|
||||||
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
|
|
||||||
assertEquals(request, incomingData.get());
|
|
||||||
|
|
||||||
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
|
||||||
assertEquals(expectedResponse, outgoingData.get());
|
|
||||||
|
|
||||||
byte[] responseBytes = readResponse(socket);
|
|
||||||
String response = new String(responseBytes, StandardCharsets.UTF_8);
|
|
||||||
assertEquals(expectedResponse, response);
|
|
||||||
|
|
||||||
socket.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTrafficWithRequestContentWithResponseRedirectOnPersistentConnection() throws Exception
|
|
||||||
{
|
|
||||||
final String location = "/redirect";
|
|
||||||
initConnector(new AbstractHandler()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
|
||||||
{
|
|
||||||
request.setHandled(true);
|
|
||||||
servletResponse.sendRedirect(location);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
final AtomicReference<String> incomingData = new AtomicReference<>();
|
|
||||||
final CountDownLatch incomingLatch = new CountDownLatch(1);
|
|
||||||
final AtomicReference<String> outgoingData = new AtomicReference<>("");
|
|
||||||
final CountDownLatch outgoingLatch = new CountDownLatch(1);
|
|
||||||
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void incoming(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
incomingData.set(BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
incomingLatch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void outgoing(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
outgoingLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
int port = connector.getLocalPort();
|
|
||||||
|
|
||||||
String requestContent = "a=1&b=2";
|
|
||||||
String request =
|
|
||||||
"POST / HTTP/1.1\r\n" +
|
|
||||||
"Host: localhost:" + port + "\r\n" +
|
|
||||||
"Content-Type: application/x-www-form-urlencoded\r\n" +
|
|
||||||
"Content-Length: " + requestContent.length() + "\r\n" +
|
|
||||||
"\r\n" +
|
|
||||||
requestContent;
|
|
||||||
String expectedResponse =
|
|
||||||
"HTTP/1.1 302 Found\r\n" +
|
|
||||||
"Location: http://localhost:" + port + location + "\r\n" +
|
|
||||||
"Content-Length: 0\r\n" +
|
|
||||||
"\r\n";
|
|
||||||
|
|
||||||
Socket socket = new Socket("localhost", port);
|
|
||||||
OutputStream output = socket.getOutputStream();
|
|
||||||
output.write(request.getBytes(StandardCharsets.UTF_8));
|
|
||||||
output.flush();
|
|
||||||
|
|
||||||
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
|
|
||||||
assertEquals(request, incomingData.get());
|
|
||||||
|
|
||||||
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
|
||||||
assertEquals(expectedResponse, outgoingData.get());
|
|
||||||
|
|
||||||
byte[] responseBytes = readResponse(socket);
|
|
||||||
String response = new String(responseBytes, StandardCharsets.UTF_8);
|
|
||||||
assertEquals(expectedResponse, response);
|
|
||||||
|
|
||||||
socket.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTrafficWithBigRequestContentOnPersistentConnection() throws Exception
|
|
||||||
{
|
|
||||||
initConnector(new AbstractHandler()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
|
||||||
{
|
|
||||||
// Read and discard the request body to make the test more
|
|
||||||
// reliable, otherwise there is a race between request body
|
|
||||||
// upload and response download
|
|
||||||
InputStream input = servletRequest.getInputStream();
|
|
||||||
byte[] buffer = new byte[4096];
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
int read = input.read(buffer);
|
|
||||||
if (read < 0)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
request.setHandled(true);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
final AtomicReference<String> incomingData = new AtomicReference<>("");
|
|
||||||
final AtomicReference<String> outgoingData = new AtomicReference<>("");
|
|
||||||
final CountDownLatch outgoingLatch = new CountDownLatch(1);
|
|
||||||
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public void incoming(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
incomingData.set(incomingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void outgoing(Socket socket, ByteBuffer bytes)
|
|
||||||
{
|
|
||||||
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes, StandardCharsets.UTF_8));
|
|
||||||
outgoingLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
int port = connector.getLocalPort();
|
|
||||||
|
|
||||||
// Generate 32 KiB of request content
|
|
||||||
String requestContent = "0123456789ABCDEF";
|
|
||||||
for (int i = 0; i < 11; ++i)
|
|
||||||
{
|
|
||||||
requestContent += requestContent;
|
|
||||||
}
|
|
||||||
String request =
|
|
||||||
"POST / HTTP/1.1\r\n" +
|
|
||||||
"Host: localhost:" + port + "\r\n" +
|
|
||||||
"Content-Type: text/plain\r\n" +
|
|
||||||
"Content-Length: " + requestContent.length() + "\r\n" +
|
|
||||||
"\r\n" +
|
|
||||||
requestContent;
|
|
||||||
String expectedResponse =
|
|
||||||
"HTTP/1.1 200 OK\r\n" +
|
|
||||||
"Content-Length: 0\r\n" +
|
|
||||||
"\r\n";
|
|
||||||
|
|
||||||
Socket socket = new Socket("localhost", port);
|
|
||||||
OutputStream output = socket.getOutputStream();
|
|
||||||
output.write(request.getBytes(StandardCharsets.UTF_8));
|
|
||||||
output.flush();
|
|
||||||
|
|
||||||
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
|
||||||
assertEquals(expectedResponse, outgoingData.get());
|
|
||||||
|
|
||||||
byte[] responseBytes = readResponse(socket);
|
|
||||||
String response = new String(responseBytes, StandardCharsets.UTF_8);
|
|
||||||
assertEquals(expectedResponse, response);
|
|
||||||
|
|
||||||
assertEquals(request, incomingData.get());
|
|
||||||
|
|
||||||
socket.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] readResponse(Socket socket) throws IOException
|
|
||||||
{
|
|
||||||
socket.setSoTimeout(5000);
|
|
||||||
InputStream input = socket.getInputStream();
|
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
||||||
int read;
|
|
||||||
while ((read = input.read()) >= 0)
|
|
||||||
{
|
|
||||||
baos.write(read);
|
|
||||||
|
|
||||||
// Handle non-chunked end of response
|
|
||||||
if (read == END_OF_CONTENT)
|
|
||||||
break;
|
|
||||||
|
|
||||||
// Handle chunked end of response
|
|
||||||
String response = baos.toString("UTF-8");
|
|
||||||
if (response.endsWith("\r\n0\r\n\r\n"))
|
|
||||||
break;
|
|
||||||
|
|
||||||
// Handle non-content responses
|
|
||||||
if (response.contains("Content-Length: 0") && response.endsWith("\r\n\r\n"))
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return baos.toByteArray();
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue