431519 Fixed NetworkTrafficListener

This commit is contained in:
Greg Wilkins 2014-04-10 16:20:10 +10:00
parent 268ca92ce2
commit 9f3e986119
3 changed files with 51 additions and 30 deletions

View File

@ -19,11 +19,13 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.List; import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
@ -57,9 +59,11 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
if (b.hasRemaining()) if (b.hasRemaining())
{ {
int position = b.position(); int position = b.position();
ByteBuffer view=b.slice();
flushed&=super.flush(b); flushed&=super.flush(b);
int l=b.position()-position; int l=b.position()-position;
notifyOutgoing(b, position, l); view.limit(view.position()+l);
notifyOutgoing(view);
if (!flushed) if (!flushed)
break; break;
} }
@ -68,8 +72,11 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
} }
public void notifyOpened()
@Override
public void onOpen()
{ {
super.onOpen();
if (listeners != null && !listeners.isEmpty()) if (listeners != null && !listeners.isEmpty())
{ {
for (NetworkTrafficListener listener : listeners) for (NetworkTrafficListener listener : listeners)
@ -86,6 +93,27 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
} }
} }
@Override
public void onClose()
{
super.onClose();
if (listeners != null && !listeners.isEmpty())
{
for (NetworkTrafficListener listener : listeners)
{
try
{
listener.closed(getSocket());
}
catch (Exception x)
{
LOG.warn(x);
}
}
}
}
public void notifyIncoming(ByteBuffer buffer, int read) public void notifyIncoming(ByteBuffer buffer, int read)
{ {
if (listeners != null && !listeners.isEmpty() && read > 0) if (listeners != null && !listeners.isEmpty() && read > 0)
@ -105,18 +133,16 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
} }
} }
public void notifyOutgoing(ByteBuffer buffer, int position, int written) public void notifyOutgoing(ByteBuffer view)
{ {
if (listeners != null && !listeners.isEmpty() && written > 0) if (listeners != null && !listeners.isEmpty() && view.hasRemaining())
{ {
Socket socket=getSocket();
for (NetworkTrafficListener listener : listeners) for (NetworkTrafficListener listener : listeners)
{ {
try try
{ {
ByteBuffer view = buffer.slice(); listener.outgoing(socket, view);
view.position(position);
view.limit(position + written);
listener.outgoing(getSocket(), view);
} }
catch (Exception x) catch (Exception x)
{ {
@ -126,21 +152,4 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
} }
} }
public void notifyClosed()
{
if (listeners != null && !listeners.isEmpty())
{
for (NetworkTrafficListener listener : listeners)
{
try
{
listener.closed(getSocket());
}
catch (Exception x)
{
LOG.warn(x);
}
}
}
}
} }

View File

@ -87,7 +87,6 @@ public class NetworkTrafficServerConnector extends ServerConnector
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key) throws IOException protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key) throws IOException
{ {
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners); NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners);
endPoint.notifyOpened();
return endPoint; return endPoint;
} }
} }

View File

@ -44,7 +44,6 @@ import org.junit.After;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@Ignore
public class NetworkTrafficListenerTest public class NetworkTrafficListenerTest
{ {
private static final byte END_OF_CONTENT = '~'; private static final byte END_OF_CONTENT = '~';
@ -114,6 +113,7 @@ public class NetworkTrafficListenerTest
{ {
initConnector(new AbstractHandler() initConnector(new AbstractHandler()
{ {
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{ {
request.setHandled(true); request.setHandled(true);
@ -176,6 +176,7 @@ public class NetworkTrafficListenerTest
final String responseContent = "response_content"; final String responseContent = "response_content";
initConnector(new AbstractHandler() initConnector(new AbstractHandler()
{ {
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{ {
request.setHandled(true); request.setHandled(true);
@ -191,12 +192,14 @@ public class NetworkTrafficListenerTest
final CountDownLatch outgoingLatch = new CountDownLatch(2); final CountDownLatch outgoingLatch = new CountDownLatch(2);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{ {
@Override
public void incoming(Socket socket, ByteBuffer bytes) public void incoming(Socket socket, ByteBuffer bytes)
{ {
incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8)); incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8));
incomingLatch.countDown(); incomingLatch.countDown();
} }
@Override
public void outgoing(Socket socket, ByteBuffer bytes) public void outgoing(Socket socket, ByteBuffer bytes)
{ {
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));
@ -241,6 +244,7 @@ public class NetworkTrafficListenerTest
final String responseChunk2 = "response_content".substring(responseContent.length() / 2, responseContent.length()); final String responseChunk2 = "response_content".substring(responseContent.length() / 2, responseContent.length());
initConnector(new AbstractHandler() initConnector(new AbstractHandler()
{ {
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{ {
request.setHandled(true); request.setHandled(true);
@ -255,18 +259,21 @@ public class NetworkTrafficListenerTest
final AtomicReference<String> incomingData = new AtomicReference<>(); final AtomicReference<String> incomingData = new AtomicReference<>();
final CountDownLatch incomingLatch = new CountDownLatch(1); final CountDownLatch incomingLatch = new CountDownLatch(1);
final AtomicReference<String> outgoingData = new AtomicReference<>(""); final AtomicReference<String> outgoingData = new AtomicReference<>("");
final CountDownLatch outgoingLatch = new CountDownLatch(4); final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{ {
@Override
public void incoming(Socket socket, ByteBuffer bytes) public void incoming(Socket socket, ByteBuffer bytes)
{ {
incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8)); incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8));
incomingLatch.countDown(); incomingLatch.countDown();
} }
@Override
public void outgoing(Socket socket, ByteBuffer bytes) public void outgoing(Socket socket, ByteBuffer bytes)
{ {
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));
if (outgoingData.get().endsWith("\r\n0\r\n\r\n"))
outgoingLatch.countDown(); outgoingLatch.countDown();
} }
}); });
@ -311,6 +318,7 @@ public class NetworkTrafficListenerTest
final String location = "/redirect"; final String location = "/redirect";
initConnector(new AbstractHandler() initConnector(new AbstractHandler()
{ {
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{ {
request.setHandled(true); request.setHandled(true);
@ -324,12 +332,14 @@ public class NetworkTrafficListenerTest
final CountDownLatch outgoingLatch = new CountDownLatch(1); final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{ {
@Override
public void incoming(Socket socket, ByteBuffer bytes) public void incoming(Socket socket, ByteBuffer bytes)
{ {
incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8)); incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8));
incomingLatch.countDown(); incomingLatch.countDown();
} }
@Override
public void outgoing(Socket socket, ByteBuffer bytes) public void outgoing(Socket socket, ByteBuffer bytes)
{ {
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));
@ -375,6 +385,7 @@ public class NetworkTrafficListenerTest
{ {
initConnector(new AbstractHandler() initConnector(new AbstractHandler()
{ {
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{ {
// Read and discard the request body to make the test more // Read and discard the request body to make the test more
@ -397,11 +408,13 @@ public class NetworkTrafficListenerTest
final CountDownLatch outgoingLatch = new CountDownLatch(1); final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{ {
@Override
public void incoming(Socket socket, ByteBuffer bytes) public void incoming(Socket socket, ByteBuffer bytes)
{ {
incomingData.set(incomingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); incomingData.set(incomingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));
} }
@Override
public void outgoing(Socket socket, ByteBuffer bytes) public void outgoing(Socket socket, ByteBuffer bytes)
{ {
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));