diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java index 7546b73b3a..3284dd0e32 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java @@ -16,15 +16,12 @@ */ package org.apache.activemq.transport.udp; -import org.activeio.ByteSequence; import org.apache.activemq.Service; import org.apache.activemq.command.Command; import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.LastPartialCommand; import org.apache.activemq.command.PartialCommand; -import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.transport.TransportListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,7 +54,6 @@ public class CommandChannel implements Service { // reading private Object readLock = new Object(); private ByteBuffer readBuffer; - private SocketAddress lastReadDatagramAddress; // writing private Object writeLock = new Object(); @@ -80,10 +76,6 @@ public class CommandChannel implements Service { } public void start() throws Exception { - // wireFormat.setPrefixPacketSize(false); - wireFormat.setCacheEnabled(false); - wireFormat.setTightEncodingEnabled(true); - bufferPool.setDefaultSize(datagramSize); bufferPool.start(); readBuffer = bufferPool.borrowBuffer(); @@ -96,22 +88,19 @@ public class CommandChannel implements Service { public Command read() throws IOException { Command answer = null; - lastReadDatagramAddress = null; synchronized (readLock) { readBuffer.clear(); - lastReadDatagramAddress = channel.receive(readBuffer); + SocketAddress address = channel.receive(readBuffer); readBuffer.flip(); - Endpoint from = headerMarshaller.createEndpoint(readBuffer, lastReadDatagramAddress); + Endpoint from = headerMarshaller.createEndpoint(readBuffer, address); int remaining = readBuffer.remaining(); - byte[] data = new byte[remaining]; readBuffer.get(data); // TODO could use a DataInput implementation that talks direct to - // the - // ByteBuffer + // the ByteBuffer to avoid object allocation and unnecessary buffering? DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); answer = (Command) wireFormat.unmarshal(dataIn); answer.setFrom(from); @@ -124,15 +113,6 @@ public class CommandChannel implements Service { return answer; } - /** - * Called if a packet is received on a different channel from a remote - * client - * - * @throws IOException - */ - public void setWireFormatInfoEndpoint(DatagramEndpoint endpoint) throws IOException { - } - public void write(Command command) throws IOException { write(command, targetAddress); } @@ -236,11 +216,6 @@ public class CommandChannel implements Service { this.headerMarshaller = headerMarshaller; } - public SocketAddress getLastReadDatagramAddress() { - synchronized (readLock) { - return lastReadDatagramAddress; - } - } // Implementation methods // ------------------------------------------------------------------------- diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java index fc226e0daa..bbdeff4e3b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java @@ -18,11 +18,12 @@ package org.apache.activemq.transport.udp; import org.apache.activemq.Service; import org.apache.activemq.command.Command; +import org.apache.activemq.command.Endpoint; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportThreadSupport; -import org.apache.activemq.transport.replay.ReplayStrategy; import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy; +import org.apache.activemq.transport.replay.ReplayStrategy; import org.apache.activemq.util.ServiceStopper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,7 +61,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S private int port; private int minmumWireFormatVersion; private String description = null; - private DatagramEndpoint wireFormatHeader; protected UdpTransport(OpenWireFormat wireFormat) throws IOException { this.wireFormat = wireFormat; @@ -105,11 +105,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S checkStarted(command); commandChannel.write(command, address); } - - public void receivedHeader(DatagramEndpoint endpoint) { - wireFormatHeader = endpoint; - } - + /** * @return pretty print of 'this' */ @@ -132,10 +128,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S Command command = commandChannel.read(); doConsume(command); } - /* - * catch (SocketTimeoutException e) { } catch - * (InterruptedIOException e) { } - */ catch (AsynchronousCloseException e) { try { stop(); @@ -168,13 +160,16 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S * the target to be the actual channel that the server has chosen for us to * talk on. */ - public void useLastInboundDatagramAsNewTarget() { - if (originalTargetAddress == null) { - originalTargetAddress = targetAddress; - } - SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress(); - if (lastAddress != null) { - targetAddress = lastAddress; + public void setTargetEndpoint(Endpoint newTarget) { + if (newTarget instanceof DatagramEndpoint) { + DatagramEndpoint endpoint = (DatagramEndpoint) newTarget; + SocketAddress address = endpoint.getAddress(); + if (address != null) { + if (originalTargetAddress == null) { + originalTargetAddress = targetAddress; + } + targetAddress = address; + } } } @@ -318,12 +313,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller()); commandChannel.start(); - // lets pass the header & address into the channel so it avoids a - // re-request - if (wireFormatHeader != null) { - commandChannel.setWireFormatInfoEndpoint(wireFormatHeader); - } - super.doStart(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java index 8edd359f92..cfa9d799f8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java @@ -120,9 +120,10 @@ public class UdpTransportFactory extends TransportFactory { protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) { transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) { protected void onWireFormatNegotiated(WireFormatInfo info) { - // lets switch to the targetAddress that the last packet was - // received as so that all future requests go to the newly created UDP channel - udpTransport.useLastInboundDatagramAsNewTarget(); + // lets switch to the target endpoint + // based on the last packet that was received + // so that all future requests go to the newly created UDP channel + udpTransport.setTargetEndpoint(info.getFrom()); } }; return transport; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java index 098bf27c05..ca96f4886f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java @@ -18,7 +18,6 @@ package org.apache.activemq.transport.udp; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; -import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.CommandJoiner; @@ -145,9 +144,6 @@ public class UdpTransportServer extends TransportServerSupport { final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy(); final UdpTransport transport = new UdpTransport(connectionWireFormat, address); - // TODO - is this still required? - transport.receivedHeader(endpoint); - Transport configuredTransport = new CommandJoiner(transport, connectionWireFormat); return new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) {