431519 Fixed NetworkTrafficListener

This commit is contained in:
Greg Wilkins 2014-04-10 16:20:10 +10:00
parent 866960d5d7
commit ef400675aa
3 changed files with 51 additions and 30 deletions

View File

@ -19,11 +19,13 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
@ -57,9 +59,11 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
if (b.hasRemaining())
{
int position = b.position();
ByteBuffer view=b.slice();
flushed&=super.flush(b);
int l=b.position()-position;
notifyOutgoing(b, position, l);
view.limit(view.position()+l);
notifyOutgoing(view);
if (!flushed)
break;
}
@ -67,9 +71,12 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
return flushed;
}
public void notifyOpened()
@Override
public void onOpen()
{
super.onOpen();
if (listeners != null && !listeners.isEmpty())
{
for (NetworkTrafficListener listener : listeners)
@ -86,6 +93,27 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
}
}
@Override
public void onClose()
{
super.onClose();
if (listeners != null && !listeners.isEmpty())
{
for (NetworkTrafficListener listener : listeners)
{
try
{
listener.closed(getSocket());
}
catch (Exception x)
{
LOG.warn(x);
}
}
}
}
public void notifyIncoming(ByteBuffer buffer, int read)
{
if (listeners != null && !listeners.isEmpty() && read > 0)
@ -105,18 +133,16 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
}
}
public void notifyOutgoing(ByteBuffer buffer, int position, int written)
public void notifyOutgoing(ByteBuffer view)
{
if (listeners != null && !listeners.isEmpty() && written > 0)
if (listeners != null && !listeners.isEmpty() && view.hasRemaining())
{
Socket socket=getSocket();
for (NetworkTrafficListener listener : listeners)
{
try
{
ByteBuffer view = buffer.slice();
view.position(position);
view.limit(position + written);
listener.outgoing(getSocket(), view);
listener.outgoing(socket, view);
}
catch (Exception x)
{
@ -126,21 +152,4 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
}
}
public void notifyClosed()
{
if (listeners != null && !listeners.isEmpty())
{
for (NetworkTrafficListener listener : listeners)
{
try
{
listener.closed(getSocket());
}
catch (Exception x)
{
LOG.warn(x);
}
}
}
}
}

View File

@ -87,7 +87,6 @@ public class NetworkTrafficServerConnector extends ServerConnector
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key) throws IOException
{
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners);
endPoint.notifyOpened();
return endPoint;
}
}

View File

@ -44,7 +44,6 @@ import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
@Ignore
public class NetworkTrafficListenerTest
{
private static final byte END_OF_CONTENT = '~';
@ -114,6 +113,7 @@ public class NetworkTrafficListenerTest
{
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
request.setHandled(true);
@ -176,6 +176,7 @@ public class NetworkTrafficListenerTest
final String responseContent = "response_content";
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
request.setHandled(true);
@ -191,12 +192,14 @@ public class NetworkTrafficListenerTest
final CountDownLatch outgoingLatch = new CountDownLatch(2);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8));
incomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));
@ -241,6 +244,7 @@ public class NetworkTrafficListenerTest
final String responseChunk2 = "response_content".substring(responseContent.length() / 2, responseContent.length());
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
request.setHandled(true);
@ -255,19 +259,22 @@ public class NetworkTrafficListenerTest
final AtomicReference<String> incomingData = new AtomicReference<>();
final CountDownLatch incomingLatch = new CountDownLatch(1);
final AtomicReference<String> outgoingData = new AtomicReference<>("");
final CountDownLatch outgoingLatch = new CountDownLatch(4);
final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8));
incomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));
outgoingLatch.countDown();
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));
if (outgoingData.get().endsWith("\r\n0\r\n\r\n"))
outgoingLatch.countDown();
}
});
int port = connector.getLocalPort();
@ -311,6 +318,7 @@ public class NetworkTrafficListenerTest
final String location = "/redirect";
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
request.setHandled(true);
@ -324,12 +332,14 @@ public class NetworkTrafficListenerTest
final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8));
incomingLatch.countDown();
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));
@ -375,6 +385,7 @@ public class NetworkTrafficListenerTest
{
initConnector(new AbstractHandler()
{
@Override
public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException
{
// Read and discard the request body to make the test more
@ -397,11 +408,13 @@ public class NetworkTrafficListenerTest
final CountDownLatch outgoingLatch = new CountDownLatch(1);
connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter()
{
@Override
public void incoming(Socket socket, ByteBuffer bytes)
{
incomingData.set(incomingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));
}
@Override
public void outgoing(Socket socket, ByteBuffer bytes)
{
outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));