From df26287181935f6a4ad93cff96621786a5d486fc Mon Sep 17 00:00:00 2001 From: James Strachan Date: Fri, 10 Mar 2006 10:21:07 +0000 Subject: [PATCH] minor refactorings to make it easier to derive from git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384755 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/udp/CommandChannel.java | 6 +-- .../udp/DatagramHeaderMarshaller.java | 7 ++- .../activemq/transport/udp/UdpTransport.java | 49 +++++++++++-------- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java index 95a46eebfe..dc617c2770 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java @@ -29,7 +29,6 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; @@ -50,7 +49,7 @@ public class CommandChannel implements Service { private int datagramSize = 4 * 1024; private DatagramReplayStrategy replayStrategy; private SocketAddress targetAddress; - private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller(); + private DatagramHeaderMarshaller headerMarshaller; private final boolean checkSequenceNumbers; // reading @@ -67,7 +66,7 @@ public class CommandChannel implements Service { private DatagramHeader header = new DatagramHeader(); public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, - DatagramReplayStrategy replayStrategy, SocketAddress targetAddress, boolean checkSequenceNumbers) { + DatagramReplayStrategy replayStrategy, SocketAddress targetAddress, boolean checkSequenceNumbers, DatagramHeaderMarshaller headerMarshaller) { this.name = name; this.channel = channel; this.wireFormat = wireFormat; @@ -76,6 +75,7 @@ public class CommandChannel implements Service { this.replayStrategy = replayStrategy; this.targetAddress = targetAddress; this.checkSequenceNumbers = checkSequenceNumbers; + this.headerMarshaller = headerMarshaller; } public String toString() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java index 8e519f3a4f..e65c34e297 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java @@ -25,8 +25,12 @@ import java.nio.ByteBuffer; */ public class DatagramHeaderMarshaller { + public DatagramHeader createDatagramHeader() { + return new DatagramHeader(); + } + public DatagramHeader readHeader(ByteBuffer readBuffer) { - DatagramHeader answer = new DatagramHeader(); + DatagramHeader answer = createDatagramHeader(); answer.setCounter(readBuffer.getLong()); answer.setDataSize(readBuffer.getInt()); byte flags = readBuffer.get(); @@ -46,5 +50,4 @@ public class DatagramHeaderMarshaller { public int getHeaderSize(DatagramHeader header) { return 8 + 4 + 1; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java index 08166b4356..b3e5633af6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java @@ -82,7 +82,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException { this(wireFormat); this.targetAddress = socketAddress; - this.description = "UdpServerConnection@"; + this.description = getProtocolName() + "ServerConnection@"; } /** @@ -92,7 +92,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S this(wireFormat); this.port = port; this.targetAddress = null; - this.description = "UdpServer@"; + this.description = getProtocolName() + "Server@"; } /** @@ -113,7 +113,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S commandChannel.write(command, address); } - public void receivedHeader(DatagramHeader header) { wireFormatHeader = header; } @@ -126,7 +125,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S return description + port; } else { - return "udp://" + targetAddress + "@" + port; + return getProtocolUriScheme() + targetAddress + "@" + port; } } @@ -169,6 +168,22 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S } } + /** + * We have received the WireFormatInfo from the server on the actual channel + * we should use for all future communication with the server, so lets set + * the target to be the actual channel that the server has chosen for us to + * talk on. + */ + public void useLastInboundDatagramAsNewTarget() { + if (originalTargetAddress == null) { + originalTargetAddress = targetAddress; + } + SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress(); + if (lastAddress != null) { + targetAddress = lastAddress; + } + } + // Properties // ------------------------------------------------------------------------- public boolean isTrace() { @@ -255,7 +270,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S public OpenWireFormat getWireFormat() { return wireFormat; } - + public boolean isCheckSequenceNumbers() { return checkSequenceNumbers; } @@ -313,7 +328,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S if (bufferPool == null) { bufferPool = new DefaultBufferPool(); } - commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress, isCheckSequenceNumbers()); + commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress, isCheckSequenceNumbers(), createDatagramHeaderMarshaller()); commandChannel.start(); // lets pass the header & address into the channel so it avoids a @@ -331,21 +346,15 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S } } - /** - * We have received the WireFormatInfo from the server on the actual channel - * we should use for all future communication with the server, so lets set - * the target to be the actual channel that the server has chosen for us to - * talk on. - */ - public void useLastInboundDatagramAsNewTarget() { - if (originalTargetAddress == null) { - originalTargetAddress = targetAddress; - } - SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress(); - if (lastAddress != null) { - targetAddress = lastAddress; - } + protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() { + return new DatagramHeaderMarshaller(); } + protected String getProtocolName() { + return "Udp"; + } + protected String getProtocolUriScheme() { + return "udp://"; + } }