From 4f446eb0254d720b318d9c2de1ee712307a85842 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Thu, 9 Mar 2006 18:06:32 +0000 Subject: [PATCH] initial spike of UDP server transport with some test cases (some of which are commented out as they are not quite working yet) git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384569 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-core/project.xml | 2 + .../transport/WireFormatNegotiator.java | 17 +++- .../transport/udp/CommandChannel.java | 65 ++++++++++--- .../transport/udp/CommandProcessor.java | 4 +- .../transport/udp/CommandReadBuffer.java | 1 + .../transport/udp/DatagramHeader.java | 18 +++- .../activemq/transport/udp/UdpTransport.java | 88 ++++++++++++++--- .../transport/udp/UdpTransportFactory.java | 96 ++++++++++++------- .../transport/udp/UdpTransportServer.java | 83 ++++++++++------ .../udp/UdpTransportServerClient.java | 71 -------------- .../UdpSendReceiveWithTwoConnectionsTest.java | 55 +++++++++++ .../transport/udp/UdpTestSupport.java | 94 +++++++++++++----- .../transport/udp/UdpTransportTest.java | 22 ++++- .../udp/UdpTransportUsingServerTest.java | 56 +++++++++++ 14 files changed, 478 insertions(+), 194 deletions(-) delete mode 100755 activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java create mode 100755 activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java diff --git a/activemq-core/project.xml b/activemq-core/project.xml index e194d781ca..401481e5e2 100755 --- a/activemq-core/project.xml +++ b/activemq-core/project.xml @@ -361,6 +361,8 @@ **/MultipleTestsWithXBeanFactoryBeanTest.* **/MultipleTestsWithSpringXBeanFactoryBeanTest.* + + **/UdpTransportUsingServerTest.* **/UdpSendReceiveWithTwoConnectionsTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java index cb65988d1a..a0a901ba48 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java @@ -58,7 +58,10 @@ public class WireFormatNegotiator extends TransportFilter { if( firstStart.compareAndSet(true, false) ) { try { WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); - next.oneway(info); + if (log.isDebugEnabled()) { + log.debug("Sending: " + info); + } + sendWireFormat(info); } finally { wireInfoSentDownLatch.countDown(); } @@ -99,11 +102,12 @@ public class WireFormatNegotiator extends TransportFilter { onException((IOException) new InterruptedIOException().initCause(e)); } readyCountDownLatch.countDown(); - + onWireFormatNegotiated(info); } getTransportListener().onCommand(command); } - + + public void onException(IOException error) { readyCountDownLatch.countDown(); super.onException(error); @@ -112,4 +116,11 @@ public class WireFormatNegotiator extends TransportFilter { public String toString() { return next.toString(); } + + protected void sendWireFormat(WireFormatInfo info) throws IOException { + next.oneway(info); + } + + protected void onWireFormatNegotiated(WireFormatInfo info) { + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java index beab1d0a35..8d124a3abb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java @@ -55,6 +55,7 @@ public class CommandChannel implements Service { private Object readLock = new Object(); private ByteBuffer readBuffer; private CommandReadBuffer readStack; + private SocketAddress lastReadDatagramAddress; // writing private Object writeLock = new Object(); @@ -63,7 +64,8 @@ public class CommandChannel implements Service { private int largeMessageBufferSize = 128 * 1024; private DatagramHeader header = new DatagramHeader(); - public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) { + public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, + DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) { this.channel = channel; this.wireFormat = wireFormat; this.bufferPool = bufferPool; @@ -73,7 +75,7 @@ public class CommandChannel implements Service { } public void start() throws Exception { - //wireFormat.setPrefixPacketSize(false); + // wireFormat.setPrefixPacketSize(false); wireFormat.setCacheEnabled(false); wireFormat.setTightEncodingEnabled(true); @@ -89,33 +91,43 @@ public class CommandChannel implements Service { } public void read(CommandProcessor processor) throws IOException { + DatagramHeader header = null; Command answer = null; - SocketAddress address = null; + lastReadDatagramAddress = null; synchronized (readLock) { readBuffer.clear(); - address = channel.receive(readBuffer); + lastReadDatagramAddress = channel.receive(readBuffer); readBuffer.flip(); + + if (log.isDebugEnabled()) { + log.debug("Read a datagram from: " + lastReadDatagramAddress); + } + header = headerMarshaller.readHeader(readBuffer); + header.setFromAddress(lastReadDatagramAddress); if (log.isDebugEnabled()) { - log.debug("Read a datagram from: " + address); + log.debug("Received datagram from: " + lastReadDatagramAddress + " header: " + header); } - DatagramHeader header = headerMarshaller.readHeader(readBuffer); - int remaining = readBuffer.remaining(); int size = header.getDataSize(); + /* if (size > remaining) { throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining"); } else if (size < remaining) { log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining); } + */ + if (size != remaining) { + log.warn("Expecting: " + size + " but has: " + remaining); + } if (header.isPartial()) { byte[] data = new byte[size]; readBuffer.get(data); header.setPartialData(data); } else { - byte[] data = new byte[size]; + byte[] data = new byte[remaining]; readBuffer.get(data); // TODO use a DataInput implementation that talks direct to the @@ -128,17 +140,28 @@ public class CommandChannel implements Service { answer = readStack.read(header); } if (answer != null) { - processor.process(answer, address); + processor.process(answer, header); } } + /** + * Called if a packet is received on a different channel from a remote client + * @throws IOException + */ + public Command onDatagramReceived(DatagramHeader header) throws IOException { + return readStack.read(header); + } + public void write(Command command) throws IOException { write(command, targetAddress); } - + public void write(Command command, SocketAddress address) throws IOException { synchronized (writeLock) { header.incrementCounter(); + bs = new BooleanStream(); + // TODO + //bs.clear(); int size = wireFormat.tightMarshal1(command, bs); if (size < datagramSize) { header.setPartial(false); @@ -187,11 +210,6 @@ public class CommandChannel implements Service { } } - protected void sendWriteBuffer(SocketAddress address) throws IOException { - writeBuffer.flip(); - channel.send(writeBuffer, address); - } - // Properties // ------------------------------------------------------------------------- @@ -225,5 +243,22 @@ public class CommandChannel implements Service { this.headerMarshaller = headerMarshaller; } + public SocketAddress getLastReadDatagramAddress() { + synchronized (readLock) { + return lastReadDatagramAddress; + } + } + + + // Implementation methods + // ------------------------------------------------------------------------- + protected void sendWriteBuffer(SocketAddress address) throws IOException { + writeBuffer.flip(); + + if (log.isDebugEnabled()) { + log.debug("Sending datagram to: " + address + " header: " + header); + } + channel.send(writeBuffer, address); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java index 4350d8cbd3..cbdfac3e6e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java @@ -18,7 +18,7 @@ package org.apache.activemq.transport.udp; import org.apache.activemq.command.Command; -import java.net.SocketAddress; +import java.io.IOException; /** * A callback used to process inbound commands @@ -27,6 +27,6 @@ import java.net.SocketAddress; */ public interface CommandProcessor { - void process(Command command, SocketAddress address); + void process(Command command, DatagramHeader header) throws IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java index c66e931584..ef41e74ca8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java @@ -93,4 +93,5 @@ public class CommandReadBuffer { return answer; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java index cb1ee7cb9a..bb3e548f5c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java @@ -18,6 +18,8 @@ package org.apache.activemq.transport.udp; import org.apache.activemq.command.Command; +import java.net.SocketAddress; + /** * Represents a header used when sending data grams * @@ -32,6 +34,7 @@ public class DatagramHeader implements Comparable { private int dataSize; // transient caches + private transient SocketAddress fromAddress; private transient byte[] partialData; private transient Command command; @@ -66,6 +69,11 @@ public class DatagramHeader implements Comparable { return getClass().getName().compareTo(that.getClass().getName()); } + + public String toString() { + return "DatagramHeader[producer: " + producerId + " counter: " + counter + " flags: " + getFlags(); + } + public boolean isComplete() { return complete; } @@ -126,6 +134,8 @@ public class DatagramHeader implements Comparable { complete = (flags & 0x2) != 0; } + // Transient cached properties + public Command getCommand() { return command; } @@ -142,6 +152,12 @@ public class DatagramHeader implements Comparable { this.partialData = partialData; } - // Transient cached properties + public SocketAddress getFromAddress() { + return fromAddress; + } + + public void setFromAddress(SocketAddress fromAddress) { + this.fromAddress = fromAddress; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java index 3a9ed544aa..aed6cbc38f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java @@ -51,15 +51,22 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy(); private int datagramSize = 4 * 1024; private long maxInactivityDuration = 0; // 30000; - private InetSocketAddress targetAddress; + private SocketAddress targetAddress; + private SocketAddress originalTargetAddress; private DatagramChannel channel; private boolean trace = false; private boolean useLocalHost = true; private int port; + private int minmumWireFormatVersion; + private String description = null; + private CommandProcessor commandProcessor = new CommandProcessor() { - public void process(Command command, SocketAddress address) { + public void process(Command command, DatagramHeader header) { doConsume(command); - }}; + } + }; + + private DatagramHeader wireFormatHeader; protected UdpTransport(OpenWireFormat wireFormat) throws IOException { this.wireFormat = wireFormat; @@ -68,13 +75,25 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException { this(wireFormat); this.targetAddress = createAddress(remoteLocation); + description = remoteLocation.toString() + "@"; } - public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) throws IOException { + public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException { this(wireFormat); this.targetAddress = socketAddress; + this.description = "UdpServerConnection@"; } - + + /** + * Used by the server transport + */ + public UdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException { + this(wireFormat); + this.port = port; + this.targetAddress = null; + this.description = "UdpServer@"; + } + /** * A one way asynchronous send */ @@ -85,19 +104,28 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S /** * A one way asynchronous send to a given address */ - public void oneway(Command command, InetSocketAddress address) throws IOException { + public void oneway(Command command, SocketAddress address) throws IOException { if (log.isDebugEnabled()) { - log.debug("Sending oneway from port: " + port + " to target: " + targetAddress); + log.debug("Sending oneway from: " + this + " to target: " + targetAddress); } checkStarted(command); commandChannel.write(command, address); } + public void doConsume(Command command, DatagramHeader header) throws IOException { + wireFormatHeader = header; + } + /** * @return pretty print of 'this' */ public String toString() { - return "udp://" + targetAddress + "?port=" + port; + if (description != null) { + return description + port; + } + else { + return "udp://" + targetAddress + "@" + port; + } } /** @@ -214,7 +242,18 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S this.port = port; } - + public int getMinmumWireFormatVersion() { + return minmumWireFormatVersion; + } + + public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { + this.minmumWireFormatVersion = minmumWireFormatVersion; + } + + public OpenWireFormat getWireFormat() { + return wireFormat; + } + // Implementation methods // ------------------------------------------------------------------------- protected CommandProcessor getCommandProcessor() { @@ -224,7 +263,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S protected void setCommandProcessor(CommandProcessor commandProcessor) { this.commandProcessor = commandProcessor; } - + /** * Creates an address from the given URI */ @@ -251,19 +290,28 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S // TODO // connect to default target address to avoid security checks each time // channel = channel.connect(targetAddress); - + DatagramSocket socket = channel.socket(); + if (log.isDebugEnabled()) { + log.debug("Binding to address: " + localAddress); + } socket.bind(localAddress); if (port == 0) { port = socket.getLocalPort(); } - + if (bufferPool == null) { bufferPool = new DefaultBufferPool(); } commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress); commandChannel.start(); + // lets pass the header & address into the channel so it avoids a + // re-request + if (wireFormatHeader != null) { + commandChannel.onDatagramReceived(wireFormatHeader); + } + super.doStart(); } @@ -273,4 +321,20 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S } } + /** + * We have received the WireFormatInfo from the server on the actual channel + * we should use for all future communication with the server, so lets set + * the target to be the actual channel that the server has chosen for us to + * talk on. + */ + public void useLastInboundDatagramAsNewTarget() { + if (originalTargetAddress == null) { + originalTargetAddress = targetAddress; + } + SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress(); + if (lastAddress != null) { + targetAddress = lastAddress; + } + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java index 013fdd69d7..ffb3e53525 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.udp; import org.activeio.command.WireFormat; +import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.ResponseCorrelator; @@ -24,24 +25,33 @@ import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.WireFormatNegotiator; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IntrospectionSupport; - -import javax.net.ServerSocketFactory; -import javax.net.SocketFactory; +import org.apache.activemq.util.URISupport; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.HashMap; import java.util.Map; public class UdpTransportFactory extends TransportFactory { public TransportServer doBind(String brokerId, final URI location) throws IOException { try { - UdpTransport transport = (UdpTransport) doConnect(location); - UdpTransportServer server = new UdpTransportServer(transport); + Map options = new HashMap(URISupport.parseParamters(location)); + if (options.containsKey("port")) { + throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax"); + } + WireFormat wf = createWireFormat(options); + int port = location.getPort(); + UdpTransport transport = new UdpTransport(asOpenWireFormat(wf), port); + + Transport configuredTransport = configure(transport, wf, options, true); + UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport); + transport.setCommandProcessor(server); return server; } catch (URISyntaxException e) { @@ -53,45 +63,67 @@ public class UdpTransportFactory extends TransportFactory { } public Transport configure(Transport transport, WireFormat format, Map options) { - IntrospectionSupport.setProperties(transport, options); - UdpTransport tcpTransport = (UdpTransport) transport; + return configure(transport, format, options, false); + } - if (tcpTransport.isTrace()) { + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + IntrospectionSupport.setProperties(transport, options); + final UdpTransport udpTransport = (UdpTransport) transport; + if (udpTransport.isTrace()) { transport = new TransportLogger(transport); } - if (tcpTransport.getMaxInactivityDuration() > 0) { - transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration()); + if (format instanceof OpenWireFormat) { + transport = configureClientSideNegotiator(transport, format, udpTransport); + } + + if (udpTransport.getMaxInactivityDuration() > 0) { + transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration()); + } + return transport; + } + + protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { + OpenWireFormat wireFormat = asOpenWireFormat(wf); + wireFormat.setSizePrefixDisabled(true); + return new UdpTransport(wireFormat, location); + } + + protected Transport configure(Transport transport, WireFormat format, Map options, boolean server) { + IntrospectionSupport.setProperties(transport, options); + UdpTransport udpTransport = (UdpTransport) transport; + + if (udpTransport.isTrace()) { + transport = new TransportLogger(transport); + } + + if (!server && format instanceof OpenWireFormat) { + transport = configureClientSideNegotiator(transport, format, udpTransport); + } + + if (udpTransport.getMaxInactivityDuration() > 0) { + transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration()); } transport = new ResponseCorrelator(transport); return transport; } - public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - IntrospectionSupport.setProperties(transport, options); - UdpTransport tcpTransport = (UdpTransport) transport; - if (tcpTransport.isTrace()) { - transport = new TransportLogger(transport); - } - - if (tcpTransport.getMaxInactivityDuration() > 0) { - transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration()); - } + protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) { + transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) { + protected void onWireFormatNegotiated(WireFormatInfo info) { + // lets switch to the targetAddress that the last packet was + // received as + udpTransport.useLastInboundDatagramAsNewTarget(); + } + }; return transport; } - protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { - OpenWireFormat wireFormat = (OpenWireFormat) wf; - wireFormat.setSizePrefixDisabled(true); - return new UdpTransport(wireFormat, location); - } - - protected ServerSocketFactory createServerSocketFactory() { - return ServerSocketFactory.getDefault(); - } - - protected SocketFactory createSocketFactory() { - return SocketFactory.getDefault(); + protected OpenWireFormat asOpenWireFormat(WireFormat wf) { + OpenWireFormat answer = (OpenWireFormat) wf; + answer.setSizePrefixDisabled(true); + answer.setCacheEnabled(false); + return answer; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java index 2155fc2827..5e19bd5d46 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java @@ -18,18 +18,23 @@ package org.apache.activemq.transport.udp; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServerSupport; -import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.WireFormatNegotiator; import org.apache.activemq.util.ServiceStopper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; import java.net.SocketAddress; +import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -39,14 +44,17 @@ import java.util.Map; * @version $Revision$ */ -public class UdpTransportServer extends TransportServerSupport { +public class UdpTransportServer extends TransportServerSupport implements CommandProcessor { private static final Log log = LogFactory.getLog(UdpTransportServer.class); private UdpTransport serverTransport; + private Transport configuredTransport; private Map transports = new HashMap(); - public UdpTransportServer(UdpTransport serverTransport) { + public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport) { + super(connectURI); this.serverTransport = serverTransport; + this.configuredTransport = configuredTransport; } public String toString() { @@ -64,56 +72,71 @@ public class UdpTransportServer extends TransportServerSupport { } protected void doStart() throws Exception { - serverTransport.start(); - serverTransport.setCommandProcessor(new CommandProcessor() { - public void process(Command command, SocketAddress address) { - onInboundCommand(command, address); + log.info("Starting " + this); + + configuredTransport.setTransportListener(new TransportListener() { + public void onCommand(Command command) { + } + + public void onException(IOException error) { + } + + public void transportInterupted() { + } + + public void transportResumed() { } }); + configuredTransport.start(); } protected void doStop(ServiceStopper stopper) throws Exception { - serverTransport.stop(); + configuredTransport.stop(); } - protected void onInboundCommand(Command command, SocketAddress address) { + public void process(Command command, DatagramHeader header) throws IOException { + SocketAddress address = header.getFromAddress(); + System.out.println(toString() + " received command: " + command + " from address: " + address); Transport transport = null; synchronized (transports) { transport = (Transport) transports.get(address); if (transport == null) { - transport = createTransport(address); + System.out.println("###Êcreating new server connector"); + transport = createTransport(command, header); transport = configureTransport(transport); transports.put(address, transport); } - } - processInboundCommand(command, transport); - } - - public void sendOutboundCommand(Command command, SocketAddress address) { - // TODO we should use an inbound buffer to make this async - - } - - protected void processInboundCommand(Command command, Transport transport) { - // TODO - consider making this asynchronous - TransportListener listener = transport.getTransportListener(); - if (listener != null) { - listener.onCommand(command); - } - else { - log.error("No transportListener available for transport: " + transport + " to process inbound command: " + command); + else { + log.warn("Discarding duplicate command to server: " + command + " from: " + address); + } } } protected Transport configureTransport(Transport transport) { transport = new ResponseCorrelator(transport); - transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration()); + + // TODO + //transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration()); + getAcceptListener().onAccept(transport); return transport; } - protected TransportSupport createTransport(SocketAddress address) { - return new UdpTransportServerClient(this, address); + protected Transport createTransport(Command command, DatagramHeader header) throws IOException { + final SocketAddress address = header.getFromAddress(); + // TODO lets copy the wireformat... + final UdpTransport transport = new UdpTransport(serverTransport.getWireFormat(), address); + + // lets send the packet into the transport so it can track packets + transport.doConsume(command, header); + + return new WireFormatNegotiator(transport, serverTransport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) { + + // lets use the specific addressing of wire format + protected void sendWireFormat(WireFormatInfo info) throws IOException { + transport.oneway(info, address); + } + }; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java deleted file mode 100755 index 922fc9653c..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.udp; - -import org.apache.activemq.command.Command; -import org.apache.activemq.transport.TransportSupport; -import org.apache.activemq.util.ServiceStopper; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.io.IOException; -import java.net.SocketAddress; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -/** - * A logical server side transport instance for a remote client which works with - * the {@link UdpTransportServer} - * - * @version $Revision$ - */ -public class UdpTransportServerClient extends TransportSupport { - private static final Log log = LogFactory.getLog(UdpTransportServerClient.class); - - private UdpTransportServer server; - private SocketAddress address; - private List queue = Collections.synchronizedList(new LinkedList()); - - public UdpTransportServerClient(UdpTransportServer server, SocketAddress address) { - this.server = server; - this.address = address; - } - - public String toString() { - return "UdpClient@" + address; - } - - public void oneway(Command command) throws IOException { - checkStarted(command); - server.sendOutboundCommand(command, address); - } - - protected void doStart() throws Exception { - for (Iterator iter = queue.iterator(); iter.hasNext();) { - Command command = (Command) iter.next(); - doConsume(command); - iter.remove(); - } - } - - protected void doStop(ServiceStopper stopper) throws Exception { - queue.clear(); - } - -} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java new file mode 100755 index 0000000000..55bdb321f6 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java @@ -0,0 +1,55 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.udp; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; + +/** + * @version + */ +public class UdpSendReceiveWithTwoConnectionsTest extends JmsTopicSendReceiveWithTwoConnectionsTest { + + protected String brokerURI = "udp://localhost:8891"; + protected BrokerService broker; + + protected void setUp() throws Exception { + broker = createBroker(); + broker.start(); + + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.addConnector(brokerURI); + return answer; + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(brokerURI); + } +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java index 3741d0fd89..7e8c2faf11 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java @@ -18,39 +18,50 @@ package org.apache.activemq.transport.udp; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.TransportServer; import java.io.IOException; +import java.net.URI; import junit.framework.TestCase; /** - * + * * @version $Revision$ */ -public abstract class UdpTestSupport extends TestCase implements TransportListener { - - protected abstract Transport createConsumer() throws Exception; - - protected abstract Transport createProducer() throws Exception; +public abstract class UdpTestSupport extends TestCase implements TransportListener { protected Transport producer; protected Transport consumer; protected Object lock = new Object(); protected Command receivedCommand; - + private TransportServer server; + public void testSendingSmallMessage() throws Exception { ConsumerInfo expected = new ConsumerInfo(); expected.setSelector("Cheese"); + expected.setExclusive(true); + expected.setCommandId((short) 12); + expected.setExclusive(true); + expected.setPrefetchSize(3456); + try { + System.out.println("About to send: " + expected); producer.oneway(expected); - + Command received = assertCommandReceived(); assertTrue("Should have received a ConsumerInfo but was: " + received, received instanceof ConsumerInfo); ConsumerInfo actual = (ConsumerInfo) received; assertEquals("Selector", expected.getSelector(), actual.getSelector()); + assertEquals("isExclusive", expected.isExclusive(), actual.isExclusive()); + assertEquals("getCommandId", expected.getCommandId(), actual.getCommandId()); + assertEquals("getPrefetchSize", expected.getPrefetchSize(), actual.getPrefetchSize()); } catch (Exception e) { System.out.println("Caught: " + e); @@ -60,27 +71,49 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen } protected void setUp() throws Exception { + server = createServer(); + if (server != null) { + server.setAcceptListener(new TransportAcceptListener() { + + public void onAccept(Transport transport) { + consumer = transport; + consumer.setTransportListener(UdpTestSupport.this); + try { + consumer.start(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void onAcceptError(Exception error) { + } + }); + server.start(); + } + consumer = createConsumer(); + if (consumer != null) { + consumer.setTransportListener(this); + consumer.start(); + } + producer = createProducer(); - - consumer.setTransportListener(this); producer.setTransportListener(new TransportListener() { public void onCommand(Command command) { } - + public void onException(IOException error) { } - + public void transportInterupted() { } - + public void transportResumed() { } }); - - consumer.start(); + producer.start(); - } protected void tearDown() throws Exception { @@ -90,14 +123,22 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen if (consumer != null) { consumer.stop(); } + if (server != null) { + server.stop(); + } } public void onCommand(Command command) { - System.out.println("### Received command: " + command); - - synchronized (lock) { - receivedCommand = command; - lock.notifyAll(); + if (command instanceof WireFormatInfo) { + System.out.println("Got WireFormatInfo: " + command); + } + else { + System.out.println("### Received command: " + command); + + synchronized (lock) { + receivedCommand = command; + lock.notifyAll(); + } } } @@ -113,16 +154,23 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen System.out.println("### Transport resumed"); } - protected Command assertCommandReceived() throws InterruptedException { Command answer = null; synchronized (lock) { lock.wait(5000); answer = receivedCommand; } - + assertNotNull("Should have received a Command by now!", answer); return answer; } + protected abstract Transport createConsumer() throws Exception; + + protected abstract Transport createProducer() throws Exception; + + protected TransportServer createServer() throws Exception { + return null; + } + } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java index dd9a1cec5e..be8d188a1f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.udp; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; @@ -27,17 +28,28 @@ import java.net.URI; */ public class UdpTransportTest extends UdpTestSupport { - protected String producerURI = "udp://localhost:8830"; - protected String consumerURI = "udp://localhost:8831?port=8830"; + protected int consumerPort = 8830; + protected String producerURI = "udp://localhost:" + consumerPort; + //protected String producerURI = "udp://localhost:8830"; + //protected String consumerURI = "udp://localhost:8831?port=8830"; protected Transport createProducer() throws Exception { System.out.println("Producer using URI: " + producerURI); - return TransportFactory.connect(new URI(producerURI)); + + // The WireFormatNegotiator means we can only connect to servers + return new UdpTransport(createWireFormat(), new URI(producerURI)); + + //return TransportFactory.connect(new URI(producerURI)); } protected Transport createConsumer() throws Exception { - System.out.println("Consumer using URI: " + consumerURI); - return TransportFactory.connect(new URI(consumerURI)); + System.out.println("Consumer on port: " + consumerPort); + return new UdpTransport(createWireFormat(), consumerPort); + //return TransportFactory.connect(new URI(consumerURI)); + } + + protected OpenWireFormat createWireFormat() { + return new OpenWireFormat(); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java new file mode 100644 index 0000000000..22513b0642 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java @@ -0,0 +1,56 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.udp; + +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportServer; + +import java.net.URI; + +/** + * + * @version $Revision$ + */ +public class UdpTransportUsingServerTest extends UdpTestSupport { + + protected int consumerPort = 8830; + protected String producerURI = "udp://localhost:" + consumerPort; + protected String serverURI = producerURI; + + protected Transport createProducer() throws Exception { + System.out.println("Producer using URI: " + producerURI); + return TransportFactory.connect(new URI(producerURI)); + } + + protected TransportServer createServer() throws Exception { + return TransportFactory.bind("byBroker", new URI(serverURI)); + } + + protected Transport createConsumer() throws Exception { + return null; + } + + protected OpenWireFormat createWireFormat() { + OpenWireFormat answer = new OpenWireFormat(); + answer.setCacheEnabled(false); + answer.setSizePrefixDisabled(true); + return answer; + } + +}