From ef400675aac2746086db06ad6e8fa7bf3fb2fe27 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Apr 2014 16:20:10 +1000 Subject: [PATCH] 431519 Fixed NetworkTrafficListener --- .../NetworkTrafficSelectChannelEndPoint.java | 59 +++++++++++-------- .../server/NetworkTrafficServerConnector.java | 1 - .../server/NetworkTrafficListenerTest.java | 21 +++++-- 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java index d45902e36dd..a4b6f7d2a16 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java @@ -19,11 +19,13 @@ package org.eclipse.jetty.io; import java.io.IOException; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.List; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; @@ -57,9 +59,11 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint if (b.hasRemaining()) { int position = b.position(); + ByteBuffer view=b.slice(); flushed&=super.flush(b); int l=b.position()-position; - notifyOutgoing(b, position, l); + view.limit(view.position()+l); + notifyOutgoing(view); if (!flushed) break; } @@ -67,9 +71,12 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint return flushed; } + - public void notifyOpened() + @Override + public void onOpen() { + super.onOpen(); if (listeners != null && !listeners.isEmpty()) { for (NetworkTrafficListener listener : listeners) @@ -86,6 +93,27 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint } } + @Override + public void onClose() + { + super.onClose(); + if (listeners != null && !listeners.isEmpty()) + { + for (NetworkTrafficListener listener : listeners) + { + try + { + listener.closed(getSocket()); + } + catch (Exception x) + { + LOG.warn(x); + } + } + } + } + + public void notifyIncoming(ByteBuffer buffer, int read) { if (listeners != null && !listeners.isEmpty() && read > 0) @@ -105,18 +133,16 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint } } - public void notifyOutgoing(ByteBuffer buffer, int position, int written) + public void notifyOutgoing(ByteBuffer view) { - if (listeners != null && !listeners.isEmpty() && written > 0) + if (listeners != null && !listeners.isEmpty() && view.hasRemaining()) { + Socket socket=getSocket(); for (NetworkTrafficListener listener : listeners) { try { - ByteBuffer view = buffer.slice(); - view.position(position); - view.limit(position + written); - listener.outgoing(getSocket(), view); + listener.outgoing(socket, view); } catch (Exception x) { @@ -126,21 +152,4 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint } } - public void notifyClosed() - { - if (listeners != null && !listeners.isEmpty()) - { - for (NetworkTrafficListener listener : listeners) - { - try - { - listener.closed(getSocket()); - } - catch (Exception x) - { - LOG.warn(x); - } - } - } - } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java index 24ac7e47446..34de615be58 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java @@ -87,7 +87,6 @@ public class NetworkTrafficServerConnector extends ServerConnector protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key) throws IOException { NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners); - endPoint.notifyOpened(); return endPoint; } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java index bf84223df11..ca9d7999cb9 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/NetworkTrafficListenerTest.java @@ -44,7 +44,6 @@ import org.junit.After; import org.junit.Ignore; import org.junit.Test; -@Ignore public class NetworkTrafficListenerTest { private static final byte END_OF_CONTENT = '~'; @@ -114,6 +113,7 @@ public class NetworkTrafficListenerTest { initConnector(new AbstractHandler() { + @Override public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException { request.setHandled(true); @@ -176,6 +176,7 @@ public class NetworkTrafficListenerTest final String responseContent = "response_content"; initConnector(new AbstractHandler() { + @Override public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException { request.setHandled(true); @@ -191,12 +192,14 @@ public class NetworkTrafficListenerTest final CountDownLatch outgoingLatch = new CountDownLatch(2); connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() { + @Override public void incoming(Socket socket, ByteBuffer bytes) { incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8)); incomingLatch.countDown(); } + @Override public void outgoing(Socket socket, ByteBuffer bytes) { outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); @@ -241,6 +244,7 @@ public class NetworkTrafficListenerTest final String responseChunk2 = "response_content".substring(responseContent.length() / 2, responseContent.length()); initConnector(new AbstractHandler() { + @Override public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException { request.setHandled(true); @@ -255,19 +259,22 @@ public class NetworkTrafficListenerTest final AtomicReference incomingData = new AtomicReference<>(); final CountDownLatch incomingLatch = new CountDownLatch(1); final AtomicReference outgoingData = new AtomicReference<>(""); - final CountDownLatch outgoingLatch = new CountDownLatch(4); + final CountDownLatch outgoingLatch = new CountDownLatch(1); connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() { + @Override public void incoming(Socket socket, ByteBuffer bytes) { incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8)); incomingLatch.countDown(); } + @Override public void outgoing(Socket socket, ByteBuffer bytes) { - outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); - outgoingLatch.countDown(); + outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); + if (outgoingData.get().endsWith("\r\n0\r\n\r\n")) + outgoingLatch.countDown(); } }); int port = connector.getLocalPort(); @@ -311,6 +318,7 @@ public class NetworkTrafficListenerTest final String location = "/redirect"; initConnector(new AbstractHandler() { + @Override public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException { request.setHandled(true); @@ -324,12 +332,14 @@ public class NetworkTrafficListenerTest final CountDownLatch outgoingLatch = new CountDownLatch(1); connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() { + @Override public void incoming(Socket socket, ByteBuffer bytes) { incomingData.set(BufferUtil.toString(bytes,StandardCharsets.UTF_8)); incomingLatch.countDown(); } + @Override public void outgoing(Socket socket, ByteBuffer bytes) { outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); @@ -375,6 +385,7 @@ public class NetworkTrafficListenerTest { initConnector(new AbstractHandler() { + @Override public void handle(String uri, Request request, HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws IOException, ServletException { // Read and discard the request body to make the test more @@ -397,11 +408,13 @@ public class NetworkTrafficListenerTest final CountDownLatch outgoingLatch = new CountDownLatch(1); connector.addNetworkTrafficListener(new NetworkTrafficListener.Adapter() { + @Override public void incoming(Socket socket, ByteBuffer bytes) { incomingData.set(incomingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8)); } + @Override public void outgoing(Socket socket, ByteBuffer bytes) { outgoingData.set(outgoingData.get() + BufferUtil.toString(bytes,StandardCharsets.UTF_8));