336793 Added Network Traffic Listener
git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@2772 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
parent
5d2c3eee1b
commit
3ca737037e
|
@ -0,0 +1,46 @@
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 2011 Intalio, Inc.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
|
||||||
|
package org.eclipse.jetty.io;
|
||||||
|
|
||||||
|
import java.net.Socket;
|
||||||
|
|
||||||
|
public interface NetworkTrafficListener
|
||||||
|
{
|
||||||
|
public void opened(Socket socket);
|
||||||
|
|
||||||
|
public void incoming(Socket socket, Buffer bytes);
|
||||||
|
|
||||||
|
public void outgoing(Socket socket, Buffer bytes);
|
||||||
|
|
||||||
|
public void closed(Socket socket);
|
||||||
|
|
||||||
|
public static class Empty implements NetworkTrafficListener
|
||||||
|
{
|
||||||
|
public void opened(Socket socket)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incoming(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public void outgoing(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public void closed(Socket socket)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,140 @@
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 2011 Intalio, Inc.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
|
||||||
|
package org.eclipse.jetty.io.nio;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.SelectionKey;
|
||||||
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.io.Buffer;
|
||||||
|
import org.eclipse.jetty.io.NetworkTrafficListener;
|
||||||
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
|
||||||
|
public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
|
||||||
|
{
|
||||||
|
private final List<NetworkTrafficListener> listeners;
|
||||||
|
|
||||||
|
public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, int maxIdleTime, List<NetworkTrafficListener> listeners) throws IOException
|
||||||
|
{
|
||||||
|
super(channel, selectSet, key, maxIdleTime);
|
||||||
|
this.listeners = listeners;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int fill(Buffer buffer) throws IOException
|
||||||
|
{
|
||||||
|
int read = super.fill(buffer);
|
||||||
|
notifyIncoming(buffer, read);
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int flush(Buffer buffer) throws IOException
|
||||||
|
{
|
||||||
|
int position = buffer.getIndex();
|
||||||
|
int written = super.flush(buffer);
|
||||||
|
notifyOutgoing(buffer, position, written);
|
||||||
|
return written;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int gatheringFlush(Buffer header, ByteBuffer bbuf0, Buffer buffer, ByteBuffer bbuf1) throws IOException
|
||||||
|
{
|
||||||
|
int headerPosition = header.getIndex();
|
||||||
|
int headerLength = header.length();
|
||||||
|
int bufferPosition = buffer.getIndex();
|
||||||
|
int written = super.gatheringFlush(header, bbuf0, buffer,bbuf1);
|
||||||
|
notifyOutgoing(header, headerPosition, written > headerLength ? headerLength : written);
|
||||||
|
notifyOutgoing(buffer, bufferPosition, written > headerLength ? written - headerLength : 0);
|
||||||
|
return written;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notifyOpened()
|
||||||
|
{
|
||||||
|
if (listeners != null && !listeners.isEmpty())
|
||||||
|
{
|
||||||
|
for (NetworkTrafficListener listener : listeners)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
listener.opened(_socket);
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
Log.warn(x);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notifyIncoming(Buffer buffer, int read)
|
||||||
|
{
|
||||||
|
if (listeners != null && !listeners.isEmpty() && read > 0)
|
||||||
|
{
|
||||||
|
for (NetworkTrafficListener listener : listeners)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Buffer view = buffer.asReadOnlyBuffer();
|
||||||
|
listener.incoming(_socket, view);
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
Log.warn(x);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notifyOutgoing(Buffer buffer, int position, int written)
|
||||||
|
{
|
||||||
|
if (listeners != null && !listeners.isEmpty() && written > 0)
|
||||||
|
{
|
||||||
|
for (NetworkTrafficListener listener : listeners)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Buffer view = buffer.asReadOnlyBuffer();
|
||||||
|
view.setGetIndex(position);
|
||||||
|
view.setPutIndex(position + written);
|
||||||
|
listener.outgoing(_socket, view);
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
Log.warn(x);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notifyClosed()
|
||||||
|
{
|
||||||
|
if (listeners != null && !listeners.isEmpty())
|
||||||
|
{
|
||||||
|
for (NetworkTrafficListener listener : listeners)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
listener.closed(_socket);
|
||||||
|
}
|
||||||
|
catch (Exception x)
|
||||||
|
{
|
||||||
|
Log.warn(x);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 2011 Intalio, Inc.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
|
||||||
|
package org.eclipse.jetty.server.nio;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.channels.SelectionKey;
|
||||||
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.io.NetworkTrafficListener;
|
||||||
|
import org.eclipse.jetty.io.nio.NetworkTrafficSelectChannelEndPoint;
|
||||||
|
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
|
||||||
|
import org.eclipse.jetty.io.nio.SelectorManager;
|
||||||
|
|
||||||
|
public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector
|
||||||
|
{
|
||||||
|
private final List<NetworkTrafficListener> listeners = new CopyOnWriteArrayList<NetworkTrafficListener>();
|
||||||
|
|
||||||
|
public void addNetworkTrafficListener(NetworkTrafficListener listener)
|
||||||
|
{
|
||||||
|
listeners.add(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeNetworkTrafficListener(NetworkTrafficListener listener)
|
||||||
|
{
|
||||||
|
listeners.remove(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key) throws IOException
|
||||||
|
{
|
||||||
|
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, _maxIdleTime, listeners);
|
||||||
|
endPoint.notifyOpened();
|
||||||
|
return endPoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void endPointClosed(SelectChannelEndPoint endpoint)
|
||||||
|
{
|
||||||
|
super.endPointClosed(endpoint);
|
||||||
|
((NetworkTrafficSelectChannelEndPoint)endpoint).notifyClosed();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,454 @@
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 2011 Intalio, Inc.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
|
||||||
|
package org.eclipse.jetty.server;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.Socket;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.ServletOutputStream;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.io.Buffer;
|
||||||
|
import org.eclipse.jetty.io.NetworkTrafficListener;
|
||||||
|
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||||
|
import org.eclipse.jetty.server.nio.NetworkTrafficSelectChannelConnector;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class NetworkTrafficListenerTest
|
||||||
|
{
|
||||||
|
private static final byte END_OF_CONTENT = '~';
|
||||||
|
|
||||||
|
private Server server;
|
||||||
|
private NetworkTrafficSelectChannelConnector connector;
|
||||||
|
|
||||||
|
public void initConnector(Handler handler) throws Exception
|
||||||
|
{
|
||||||
|
server = new Server();
|
||||||
|
server.setSendDateHeader(false);
|
||||||
|
server.setSendServerVersion(false);
|
||||||
|
|
||||||
|
connector = new NetworkTrafficSelectChannelConnector();
|
||||||
|
server.addConnector(connector);
|
||||||
|
server.setHandler(handler);
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void destroyConnector() throws Exception
|
||||||
|
{
|
||||||
|
if (server != null)
|
||||||
|
{
|
||||||
|
server.stop();
|
||||||
|
server.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOpenedClosedAreInvoked() throws Exception
|
||||||
|
{
|
||||||
|
initConnector(null);
|
||||||
|
|
||||||
|
final CountDownLatch openedLatch = new CountDownLatch(1);
|
||||||
|
final CountDownLatch closedLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener.Empty()
|
||||||
|
{
|
||||||
|
public volatile Socket socket;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void opened(Socket socket)
|
||||||
|
{
|
||||||
|
this.socket = socket;
|
||||||
|
openedLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void closed(Socket socket)
|
||||||
|
{
|
||||||
|
if (this.socket == socket)
|
||||||
|
closedLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
|
||||||
|
// Connect to the server
|
||||||
|
Socket socket = new Socket("localhost", port);
|
||||||
|
assertTrue(openedLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
socket.close();
|
||||||
|
assertTrue(closedLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithNoResponseContentOnNonPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
initConnector(new AbstractHandler()
|
||||||
|
{
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
request.setHandled(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final AtomicReference<String> incomingData = new AtomicReference<String>();
|
||||||
|
final CountDownLatch incomingLatch = new CountDownLatch(1);
|
||||||
|
final AtomicReference<String> outgoingData = new AtomicReference<String>("");
|
||||||
|
final CountDownLatch outgoingLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener.Empty()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void incoming(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
incomingData.set(bytes.toString("UTF-8"));
|
||||||
|
incomingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void outgoing(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
outgoingData.set(outgoingData.get() + bytes.toString("UTF-8"));
|
||||||
|
outgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
|
||||||
|
String request = "" +
|
||||||
|
"GET / HTTP/1.1\r\n" +
|
||||||
|
"Host: localhost:" + port + "\r\n" +
|
||||||
|
"Connection: close\r\n" +
|
||||||
|
"\r\n";
|
||||||
|
String expectedResponse = "" +
|
||||||
|
"HTTP/1.1 200 OK\r\n" +
|
||||||
|
"Connection: close\r\n" +
|
||||||
|
"\r\n";
|
||||||
|
|
||||||
|
Socket socket = new Socket("localhost", port);
|
||||||
|
OutputStream output = socket.getOutputStream();
|
||||||
|
output.write(request.getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(request, incomingData.get());
|
||||||
|
|
||||||
|
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(expectedResponse, outgoingData.get());
|
||||||
|
|
||||||
|
byte[] responseBytes = readResponse(socket);
|
||||||
|
String response = new String(responseBytes, "UTF-8");
|
||||||
|
assertEquals(expectedResponse, response);
|
||||||
|
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithResponseContentOnPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
final String responseContent = "response_content";
|
||||||
|
initConnector(new AbstractHandler()
|
||||||
|
{
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
request.setHandled(true);
|
||||||
|
ServletOutputStream output = servletResponse.getOutputStream();
|
||||||
|
output.write(responseContent.getBytes("UTF-8"));
|
||||||
|
output.write(END_OF_CONTENT);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final AtomicReference<String> incomingData = new AtomicReference<String>();
|
||||||
|
final CountDownLatch incomingLatch = new CountDownLatch(1);
|
||||||
|
final AtomicReference<String> outgoingData = new AtomicReference<String>("");
|
||||||
|
final CountDownLatch outgoingLatch = new CountDownLatch(2);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener.Empty()
|
||||||
|
{
|
||||||
|
public void incoming(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
incomingData.set(bytes.toString("UTF-8"));
|
||||||
|
incomingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void outgoing(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
outgoingData.set(outgoingData.get() + bytes.toString("UTF-8"));
|
||||||
|
outgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
|
||||||
|
String request = "" +
|
||||||
|
"GET / HTTP/1.1\r\n" +
|
||||||
|
"Host: localhost:" + port + "\r\n" +
|
||||||
|
"\r\n";
|
||||||
|
String expectedResponse = "" +
|
||||||
|
"HTTP/1.1 200 OK\r\n" +
|
||||||
|
"Content-Length: " + (responseContent.length() + 1) + "\r\n" +
|
||||||
|
"\r\n" +
|
||||||
|
"" + responseContent + (char)END_OF_CONTENT;
|
||||||
|
|
||||||
|
Socket socket = new Socket("localhost", port);
|
||||||
|
OutputStream output = socket.getOutputStream();
|
||||||
|
output.write(request.getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(request, incomingData.get());
|
||||||
|
|
||||||
|
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(expectedResponse, outgoingData.get());
|
||||||
|
|
||||||
|
byte[] responseBytes = readResponse(socket);
|
||||||
|
String response = new String(responseBytes, "UTF-8");
|
||||||
|
assertEquals(expectedResponse, response);
|
||||||
|
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithResponseContentChunkedOnPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
final String responseContent = "response_content";
|
||||||
|
final String responseChunk1 = "response_content".substring(0, responseContent.length() / 2);
|
||||||
|
final String responseChunk2 = "response_content".substring(responseContent.length() / 2, responseContent.length());
|
||||||
|
initConnector(new AbstractHandler()
|
||||||
|
{
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
request.setHandled(true);
|
||||||
|
ServletOutputStream output = servletResponse.getOutputStream();
|
||||||
|
output.write(responseChunk1.getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
output.write(responseChunk2.getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final AtomicReference<String> incomingData = new AtomicReference<String>();
|
||||||
|
final CountDownLatch incomingLatch = new CountDownLatch(1);
|
||||||
|
final AtomicReference<String> outgoingData = new AtomicReference<String>("");
|
||||||
|
final CountDownLatch outgoingLatch = new CountDownLatch(4);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener.Empty()
|
||||||
|
{
|
||||||
|
public void incoming(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
incomingData.set(bytes.toString("UTF-8"));
|
||||||
|
incomingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void outgoing(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
outgoingData.set(outgoingData.get() + bytes.toString("UTF-8"));
|
||||||
|
outgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
|
||||||
|
String request = "" +
|
||||||
|
"GET / HTTP/1.1\r\n" +
|
||||||
|
"Host: localhost:" + port + "\r\n" +
|
||||||
|
"\r\n";
|
||||||
|
String expectedResponse = "" +
|
||||||
|
"HTTP/1.1 200 OK\r\n" +
|
||||||
|
"Transfer-Encoding: chunked\r\n" +
|
||||||
|
"\r\n" +
|
||||||
|
responseChunk1.length() + "\r\n" +
|
||||||
|
responseChunk1 + "\r\n" +
|
||||||
|
responseChunk2.length() + "\r\n" +
|
||||||
|
responseChunk2 + "\r\n" +
|
||||||
|
"0\r\n" +
|
||||||
|
"\r\n";
|
||||||
|
|
||||||
|
Socket socket = new Socket("localhost", port);
|
||||||
|
OutputStream output = socket.getOutputStream();
|
||||||
|
output.write(request.getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(request, incomingData.get());
|
||||||
|
|
||||||
|
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(expectedResponse, outgoingData.get());
|
||||||
|
|
||||||
|
byte[] responseBytes = readResponse(socket);
|
||||||
|
String response = new String(responseBytes, "UTF-8");
|
||||||
|
assertEquals(expectedResponse, response);
|
||||||
|
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithRequestContentWithResponseRedirectOnPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
final String location = "/redirect";
|
||||||
|
initConnector(new AbstractHandler()
|
||||||
|
{
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
request.setHandled(true);
|
||||||
|
servletResponse.sendRedirect(location);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final AtomicReference<String> incomingData = new AtomicReference<String>();
|
||||||
|
final CountDownLatch incomingLatch = new CountDownLatch(1);
|
||||||
|
final AtomicReference<String> outgoingData = new AtomicReference<String>("");
|
||||||
|
final CountDownLatch outgoingLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener.Empty()
|
||||||
|
{
|
||||||
|
public void incoming(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
incomingData.set(bytes.toString("UTF-8"));
|
||||||
|
incomingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void outgoing(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
outgoingData.set(outgoingData.get() + bytes.toString("UTF-8"));
|
||||||
|
outgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
|
||||||
|
String requestContent = "a=1&b=2";
|
||||||
|
String request = "" +
|
||||||
|
"POST / HTTP/1.1\r\n" +
|
||||||
|
"Host: localhost:" + port + "\r\n" +
|
||||||
|
"Content-Type: application/x-www-form-urlencoded\r\n" +
|
||||||
|
"Content-Length: " + requestContent.length() + "\r\n" +
|
||||||
|
"\r\n" +
|
||||||
|
requestContent;
|
||||||
|
String expectedResponse = "" +
|
||||||
|
"HTTP/1.1 302 Found\r\n" +
|
||||||
|
"Location: http://localhost:" + port + location + "\r\n" +
|
||||||
|
"Content-Length: 0\r\n" +
|
||||||
|
"\r\n";
|
||||||
|
|
||||||
|
Socket socket = new Socket("localhost", port);
|
||||||
|
OutputStream output = socket.getOutputStream();
|
||||||
|
output.write(request.getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(request, incomingData.get());
|
||||||
|
|
||||||
|
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(expectedResponse, outgoingData.get());
|
||||||
|
|
||||||
|
byte[] responseBytes = readResponse(socket);
|
||||||
|
String response = new String(responseBytes, "UTF-8");
|
||||||
|
assertEquals(expectedResponse, response);
|
||||||
|
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrafficWithBigRequestContentOnPersistentConnection() throws Exception
|
||||||
|
{
|
||||||
|
initConnector(new AbstractHandler()
|
||||||
|
{
|
||||||
|
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
|
||||||
|
{
|
||||||
|
request.setHandled(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final AtomicReference<String> incomingData = new AtomicReference<String>("");
|
||||||
|
final CountDownLatch incomingLatch = new CountDownLatch(4);
|
||||||
|
final AtomicReference<String> outgoingData = new AtomicReference<String>("");
|
||||||
|
final CountDownLatch outgoingLatch = new CountDownLatch(1);
|
||||||
|
connector.addNetworkTrafficListener(new NetworkTrafficListener.Empty()
|
||||||
|
{
|
||||||
|
public void incoming(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
incomingData.set(incomingData.get() + bytes.toString("UTF-8"));
|
||||||
|
incomingLatch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void outgoing(Socket socket, Buffer bytes)
|
||||||
|
{
|
||||||
|
outgoingData.set(outgoingData.get() + bytes.toString("UTF-8"));
|
||||||
|
outgoingLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
int port = connector.getLocalPort();
|
||||||
|
|
||||||
|
// Generate 32 KiB of request content
|
||||||
|
String requestContent = "0123456789ABCDEF";
|
||||||
|
for (int i = 0; i < 11; ++i)
|
||||||
|
requestContent += requestContent;
|
||||||
|
String request = "" +
|
||||||
|
"POST / HTTP/1.1\r\n" +
|
||||||
|
"Host: localhost:" + port + "\r\n" +
|
||||||
|
"Content-Type: text/plain\r\n" +
|
||||||
|
"Content-Length: " + requestContent.length() + "\r\n" +
|
||||||
|
"\r\n" +
|
||||||
|
requestContent;
|
||||||
|
String expectedResponse = "" +
|
||||||
|
"HTTP/1.1 200 OK\r\n" +
|
||||||
|
"Content-Length: 0\r\n" +
|
||||||
|
"\r\n";
|
||||||
|
|
||||||
|
Socket socket = new Socket("localhost", port);
|
||||||
|
OutputStream output = socket.getOutputStream();
|
||||||
|
output.write(request.getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
assertTrue(incomingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(request, incomingData.get());
|
||||||
|
|
||||||
|
assertTrue(outgoingLatch.await(1, TimeUnit.SECONDS));
|
||||||
|
assertEquals(expectedResponse, outgoingData.get());
|
||||||
|
|
||||||
|
byte[] responseBytes = readResponse(socket);
|
||||||
|
String response = new String(responseBytes, "UTF-8");
|
||||||
|
assertEquals(expectedResponse, response);
|
||||||
|
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] readResponse(Socket socket) throws IOException
|
||||||
|
{
|
||||||
|
socket.setSoTimeout(5000);
|
||||||
|
InputStream input = socket.getInputStream();
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
int read;
|
||||||
|
while ((read = input.read()) >= 0)
|
||||||
|
{
|
||||||
|
baos.write(read);
|
||||||
|
|
||||||
|
// Handle non-chunked end of response
|
||||||
|
if (read == END_OF_CONTENT)
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Handle chunked end of response
|
||||||
|
String response = baos.toString("UTF-8");
|
||||||
|
if (response.endsWith("\r\n0\r\n\r\n"))
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Handle non-content responses
|
||||||
|
if (response.contains("Content-Length: 0") && response.endsWith("\r\n\r\n"))
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return baos.toByteArray();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue