initial spike of UDP server transport with some test cases (some of which are commented out as they are not quite working yet)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384569 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-09 18:06:32 +00:00
parent 22bb1bdd41
commit 4f446eb025
14 changed files with 478 additions and 194 deletions

View File

@ -361,6 +361,8 @@
<exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude>
<!-- TODO FIXME -->
<exclude>**/UdpTransportUsingServerTest.*</exclude>
<exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude> <exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude>
</excludes> </excludes>
</unitTest> </unitTest>

View File

@ -58,7 +58,10 @@ public class WireFormatNegotiator extends TransportFilter {
if( firstStart.compareAndSet(true, false) ) { if( firstStart.compareAndSet(true, false) ) {
try { try {
WireFormatInfo info = wireFormat.getPreferedWireFormatInfo(); WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
next.oneway(info); if (log.isDebugEnabled()) {
log.debug("Sending: " + info);
}
sendWireFormat(info);
} finally { } finally {
wireInfoSentDownLatch.countDown(); wireInfoSentDownLatch.countDown();
} }
@ -99,11 +102,12 @@ public class WireFormatNegotiator extends TransportFilter {
onException((IOException) new InterruptedIOException().initCause(e)); onException((IOException) new InterruptedIOException().initCause(e));
} }
readyCountDownLatch.countDown(); readyCountDownLatch.countDown();
onWireFormatNegotiated(info);
} }
getTransportListener().onCommand(command); getTransportListener().onCommand(command);
} }
public void onException(IOException error) { public void onException(IOException error) {
readyCountDownLatch.countDown(); readyCountDownLatch.countDown();
super.onException(error); super.onException(error);
@ -112,4 +116,11 @@ public class WireFormatNegotiator extends TransportFilter {
public String toString() { public String toString() {
return next.toString(); return next.toString();
} }
protected void sendWireFormat(WireFormatInfo info) throws IOException {
next.oneway(info);
}
protected void onWireFormatNegotiated(WireFormatInfo info) {
}
} }

View File

@ -55,6 +55,7 @@ public class CommandChannel implements Service {
private Object readLock = new Object(); private Object readLock = new Object();
private ByteBuffer readBuffer; private ByteBuffer readBuffer;
private CommandReadBuffer readStack; private CommandReadBuffer readStack;
private SocketAddress lastReadDatagramAddress;
// writing // writing
private Object writeLock = new Object(); private Object writeLock = new Object();
@ -63,7 +64,8 @@ public class CommandChannel implements Service {
private int largeMessageBufferSize = 128 * 1024; private int largeMessageBufferSize = 128 * 1024;
private DatagramHeader header = new DatagramHeader(); private DatagramHeader header = new DatagramHeader();
public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) { public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) {
this.channel = channel; this.channel = channel;
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
@ -73,7 +75,7 @@ public class CommandChannel implements Service {
} }
public void start() throws Exception { public void start() throws Exception {
//wireFormat.setPrefixPacketSize(false); // wireFormat.setPrefixPacketSize(false);
wireFormat.setCacheEnabled(false); wireFormat.setCacheEnabled(false);
wireFormat.setTightEncodingEnabled(true); wireFormat.setTightEncodingEnabled(true);
@ -89,33 +91,43 @@ public class CommandChannel implements Service {
} }
public void read(CommandProcessor processor) throws IOException { public void read(CommandProcessor processor) throws IOException {
DatagramHeader header = null;
Command answer = null; Command answer = null;
SocketAddress address = null; lastReadDatagramAddress = null;
synchronized (readLock) { synchronized (readLock) {
readBuffer.clear(); readBuffer.clear();
address = channel.receive(readBuffer); lastReadDatagramAddress = channel.receive(readBuffer);
readBuffer.flip(); readBuffer.flip();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Read a datagram from: " + address); log.debug("Read a datagram from: " + lastReadDatagramAddress);
} }
DatagramHeader header = headerMarshaller.readHeader(readBuffer); header = headerMarshaller.readHeader(readBuffer);
header.setFromAddress(lastReadDatagramAddress);
if (log.isDebugEnabled()) {
log.debug("Received datagram from: " + lastReadDatagramAddress + " header: " + header);
}
int remaining = readBuffer.remaining(); int remaining = readBuffer.remaining();
int size = header.getDataSize(); int size = header.getDataSize();
/*
if (size > remaining) { if (size > remaining) {
throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining"); throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining");
} }
else if (size < remaining) { else if (size < remaining) {
log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining); log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
} }
*/
if (size != remaining) {
log.warn("Expecting: " + size + " but has: " + remaining);
}
if (header.isPartial()) { if (header.isPartial()) {
byte[] data = new byte[size]; byte[] data = new byte[size];
readBuffer.get(data); readBuffer.get(data);
header.setPartialData(data); header.setPartialData(data);
} }
else { else {
byte[] data = new byte[size]; byte[] data = new byte[remaining];
readBuffer.get(data); readBuffer.get(data);
// TODO use a DataInput implementation that talks direct to the // TODO use a DataInput implementation that talks direct to the
@ -128,10 +140,18 @@ public class CommandChannel implements Service {
answer = readStack.read(header); answer = readStack.read(header);
} }
if (answer != null) { if (answer != null) {
processor.process(answer, address); processor.process(answer, header);
} }
} }
/**
* Called if a packet is received on a different channel from a remote client
* @throws IOException
*/
public Command onDatagramReceived(DatagramHeader header) throws IOException {
return readStack.read(header);
}
public void write(Command command) throws IOException { public void write(Command command) throws IOException {
write(command, targetAddress); write(command, targetAddress);
} }
@ -139,6 +159,9 @@ public class CommandChannel implements Service {
public void write(Command command, SocketAddress address) throws IOException { public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) { synchronized (writeLock) {
header.incrementCounter(); header.incrementCounter();
bs = new BooleanStream();
// TODO
//bs.clear();
int size = wireFormat.tightMarshal1(command, bs); int size = wireFormat.tightMarshal1(command, bs);
if (size < datagramSize) { if (size < datagramSize) {
header.setPartial(false); header.setPartial(false);
@ -187,11 +210,6 @@ public class CommandChannel implements Service {
} }
} }
protected void sendWriteBuffer(SocketAddress address) throws IOException {
writeBuffer.flip();
channel.send(writeBuffer, address);
}
// Properties // Properties
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -225,5 +243,22 @@ public class CommandChannel implements Service {
this.headerMarshaller = headerMarshaller; this.headerMarshaller = headerMarshaller;
} }
public SocketAddress getLastReadDatagramAddress() {
synchronized (readLock) {
return lastReadDatagramAddress;
}
}
// Implementation methods
// -------------------------------------------------------------------------
protected void sendWriteBuffer(SocketAddress address) throws IOException {
writeBuffer.flip();
if (log.isDebugEnabled()) {
log.debug("Sending datagram to: " + address + " header: " + header);
}
channel.send(writeBuffer, address);
}
} }

View File

@ -18,7 +18,7 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import java.net.SocketAddress; import java.io.IOException;
/** /**
* A callback used to process inbound commands * A callback used to process inbound commands
@ -27,6 +27,6 @@ import java.net.SocketAddress;
*/ */
public interface CommandProcessor { public interface CommandProcessor {
void process(Command command, SocketAddress address); void process(Command command, DatagramHeader header) throws IOException;
} }

View File

@ -93,4 +93,5 @@ public class CommandReadBuffer {
return answer; return answer;
} }
} }

View File

@ -18,6 +18,8 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import java.net.SocketAddress;
/** /**
* Represents a header used when sending data grams * Represents a header used when sending data grams
* *
@ -32,6 +34,7 @@ public class DatagramHeader implements Comparable {
private int dataSize; private int dataSize;
// transient caches // transient caches
private transient SocketAddress fromAddress;
private transient byte[] partialData; private transient byte[] partialData;
private transient Command command; private transient Command command;
@ -66,6 +69,11 @@ public class DatagramHeader implements Comparable {
return getClass().getName().compareTo(that.getClass().getName()); return getClass().getName().compareTo(that.getClass().getName());
} }
public String toString() {
return "DatagramHeader[producer: " + producerId + " counter: " + counter + " flags: " + getFlags();
}
public boolean isComplete() { public boolean isComplete() {
return complete; return complete;
} }
@ -126,6 +134,8 @@ public class DatagramHeader implements Comparable {
complete = (flags & 0x2) != 0; complete = (flags & 0x2) != 0;
} }
// Transient cached properties
public Command getCommand() { public Command getCommand() {
return command; return command;
} }
@ -142,6 +152,12 @@ public class DatagramHeader implements Comparable {
this.partialData = partialData; this.partialData = partialData;
} }
// Transient cached properties public SocketAddress getFromAddress() {
return fromAddress;
}
public void setFromAddress(SocketAddress fromAddress) {
this.fromAddress = fromAddress;
}
} }

View File

@ -51,15 +51,22 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy(); private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy();
private int datagramSize = 4 * 1024; private int datagramSize = 4 * 1024;
private long maxInactivityDuration = 0; // 30000; private long maxInactivityDuration = 0; // 30000;
private InetSocketAddress targetAddress; private SocketAddress targetAddress;
private SocketAddress originalTargetAddress;
private DatagramChannel channel; private DatagramChannel channel;
private boolean trace = false; private boolean trace = false;
private boolean useLocalHost = true; private boolean useLocalHost = true;
private int port; private int port;
private int minmumWireFormatVersion;
private String description = null;
private CommandProcessor commandProcessor = new CommandProcessor() { private CommandProcessor commandProcessor = new CommandProcessor() {
public void process(Command command, SocketAddress address) { public void process(Command command, DatagramHeader header) {
doConsume(command); doConsume(command);
}}; }
};
private DatagramHeader wireFormatHeader;
protected UdpTransport(OpenWireFormat wireFormat) throws IOException { protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
@ -68,11 +75,23 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException { public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
this(wireFormat); this(wireFormat);
this.targetAddress = createAddress(remoteLocation); this.targetAddress = createAddress(remoteLocation);
description = remoteLocation.toString() + "@";
} }
public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) throws IOException { public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
this(wireFormat); this(wireFormat);
this.targetAddress = socketAddress; this.targetAddress = socketAddress;
this.description = "UdpServerConnection@";
}
/**
* Used by the server transport
*/
public UdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
this(wireFormat);
this.port = port;
this.targetAddress = null;
this.description = "UdpServer@";
} }
/** /**
@ -85,19 +104,28 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/** /**
* A one way asynchronous send to a given address * A one way asynchronous send to a given address
*/ */
public void oneway(Command command, InetSocketAddress address) throws IOException { public void oneway(Command command, SocketAddress address) throws IOException {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Sending oneway from port: " + port + " to target: " + targetAddress); log.debug("Sending oneway from: " + this + " to target: " + targetAddress);
} }
checkStarted(command); checkStarted(command);
commandChannel.write(command, address); commandChannel.write(command, address);
} }
public void doConsume(Command command, DatagramHeader header) throws IOException {
wireFormatHeader = header;
}
/** /**
* @return pretty print of 'this' * @return pretty print of 'this'
*/ */
public String toString() { public String toString() {
return "udp://" + targetAddress + "?port=" + port; if (description != null) {
return description + port;
}
else {
return "udp://" + targetAddress + "@" + port;
}
} }
/** /**
@ -214,6 +242,17 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.port = port; this.port = port;
} }
public int getMinmumWireFormatVersion() {
return minmumWireFormatVersion;
}
public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
this.minmumWireFormatVersion = minmumWireFormatVersion;
}
public OpenWireFormat getWireFormat() {
return wireFormat;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -253,6 +292,9 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
// channel = channel.connect(targetAddress); // channel = channel.connect(targetAddress);
DatagramSocket socket = channel.socket(); DatagramSocket socket = channel.socket();
if (log.isDebugEnabled()) {
log.debug("Binding to address: " + localAddress);
}
socket.bind(localAddress); socket.bind(localAddress);
if (port == 0) { if (port == 0) {
port = socket.getLocalPort(); port = socket.getLocalPort();
@ -264,6 +306,12 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress); commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress);
commandChannel.start(); commandChannel.start();
// lets pass the header & address into the channel so it avoids a
// re-request
if (wireFormatHeader != null) {
commandChannel.onDatagramReceived(wireFormatHeader);
}
super.doStart(); super.doStart();
} }
@ -273,4 +321,20 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
} }
} }
/**
* We have received the WireFormatInfo from the server on the actual channel
* we should use for all future communication with the server, so lets set
* the target to be the actual channel that the server has chosen for us to
* talk on.
*/
public void useLastInboundDatagramAsNewTarget() {
if (originalTargetAddress == null) {
originalTargetAddress = targetAddress;
}
SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress();
if (lastAddress != null) {
targetAddress = lastAddress;
}
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import org.activeio.command.WireFormat; import org.activeio.command.WireFormat;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.ResponseCorrelator;
@ -24,24 +25,33 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class UdpTransportFactory extends TransportFactory { public class UdpTransportFactory extends TransportFactory {
public TransportServer doBind(String brokerId, final URI location) throws IOException { public TransportServer doBind(String brokerId, final URI location) throws IOException {
try { try {
UdpTransport transport = (UdpTransport) doConnect(location); Map options = new HashMap(URISupport.parseParamters(location));
UdpTransportServer server = new UdpTransportServer(transport); if (options.containsKey("port")) {
throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax");
}
WireFormat wf = createWireFormat(options);
int port = location.getPort();
UdpTransport transport = new UdpTransport(asOpenWireFormat(wf), port);
Transport configuredTransport = configure(transport, wf, options, true);
UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport);
transport.setCommandProcessor(server);
return server; return server;
} }
catch (URISyntaxException e) { catch (URISyntaxException e) {
@ -53,45 +63,67 @@ public class UdpTransportFactory extends TransportFactory {
} }
public Transport configure(Transport transport, WireFormat format, Map options) { public Transport configure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options); return configure(transport, format, options, false);
UdpTransport tcpTransport = (UdpTransport) transport; }
if (tcpTransport.isTrace()) { public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
final UdpTransport udpTransport = (UdpTransport) transport;
if (udpTransport.isTrace()) {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
} }
if (tcpTransport.getMaxInactivityDuration() > 0) { if (format instanceof OpenWireFormat) {
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration()); transport = configureClientSideNegotiator(transport, format, udpTransport);
}
if (udpTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
}
return transport;
}
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
OpenWireFormat wireFormat = asOpenWireFormat(wf);
wireFormat.setSizePrefixDisabled(true);
return new UdpTransport(wireFormat, location);
}
protected Transport configure(Transport transport, WireFormat format, Map options, boolean server) {
IntrospectionSupport.setProperties(transport, options);
UdpTransport udpTransport = (UdpTransport) transport;
if (udpTransport.isTrace()) {
transport = new TransportLogger(transport);
}
if (!server && format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport);
}
if (udpTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
} }
transport = new ResponseCorrelator(transport); transport = new ResponseCorrelator(transport);
return transport; return transport;
} }
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
IntrospectionSupport.setProperties(transport, options); transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {
UdpTransport tcpTransport = (UdpTransport) transport; protected void onWireFormatNegotiated(WireFormatInfo info) {
if (tcpTransport.isTrace()) { // lets switch to the targetAddress that the last packet was
transport = new TransportLogger(transport); // received as
} udpTransport.useLastInboundDatagramAsNewTarget();
}
if (tcpTransport.getMaxInactivityDuration() > 0) { };
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
}
return transport; return transport;
} }
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
OpenWireFormat wireFormat = (OpenWireFormat) wf; OpenWireFormat answer = (OpenWireFormat) wf;
wireFormat.setSizePrefixDisabled(true); answer.setSizePrefixDisabled(true);
return new UdpTransport(wireFormat, location); answer.setCacheEnabled(false);
} return answer;
protected ServerSocketFactory createServerSocketFactory() {
return ServerSocketFactory.getDefault();
}
protected SocketFactory createSocketFactory() {
return SocketFactory.getDefault();
} }
} }

View File

@ -18,18 +18,23 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerSupport; import org.apache.activemq.transport.TransportServerSupport;
import org.apache.activemq.transport.TransportSupport; import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -39,14 +44,17 @@ import java.util.Map;
* @version $Revision$ * @version $Revision$
*/ */
public class UdpTransportServer extends TransportServerSupport { public class UdpTransportServer extends TransportServerSupport implements CommandProcessor {
private static final Log log = LogFactory.getLog(UdpTransportServer.class); private static final Log log = LogFactory.getLog(UdpTransportServer.class);
private UdpTransport serverTransport; private UdpTransport serverTransport;
private Transport configuredTransport;
private Map transports = new HashMap(); private Map transports = new HashMap();
public UdpTransportServer(UdpTransport serverTransport) { public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport) {
super(connectURI);
this.serverTransport = serverTransport; this.serverTransport = serverTransport;
this.configuredTransport = configuredTransport;
} }
public String toString() { public String toString() {
@ -64,56 +72,71 @@ public class UdpTransportServer extends TransportServerSupport {
} }
protected void doStart() throws Exception { protected void doStart() throws Exception {
serverTransport.start(); log.info("Starting " + this);
serverTransport.setCommandProcessor(new CommandProcessor() {
public void process(Command command, SocketAddress address) { configuredTransport.setTransportListener(new TransportListener() {
onInboundCommand(command, address); public void onCommand(Command command) {
}
public void onException(IOException error) {
}
public void transportInterupted() {
}
public void transportResumed() {
} }
}); });
configuredTransport.start();
} }
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
serverTransport.stop(); configuredTransport.stop();
} }
protected void onInboundCommand(Command command, SocketAddress address) { public void process(Command command, DatagramHeader header) throws IOException {
SocketAddress address = header.getFromAddress();
System.out.println(toString() + " received command: " + command + " from address: " + address);
Transport transport = null; Transport transport = null;
synchronized (transports) { synchronized (transports) {
transport = (Transport) transports.get(address); transport = (Transport) transports.get(address);
if (transport == null) { if (transport == null) {
transport = createTransport(address); System.out.println("###Êcreating new server connector");
transport = createTransport(command, header);
transport = configureTransport(transport); transport = configureTransport(transport);
transports.put(address, transport); transports.put(address, transport);
} }
} else {
processInboundCommand(command, transport); log.warn("Discarding duplicate command to server: " + command + " from: " + address);
} }
public void sendOutboundCommand(Command command, SocketAddress address) {
// TODO we should use an inbound buffer to make this async
}
protected void processInboundCommand(Command command, Transport transport) {
// TODO - consider making this asynchronous
TransportListener listener = transport.getTransportListener();
if (listener != null) {
listener.onCommand(command);
}
else {
log.error("No transportListener available for transport: " + transport + " to process inbound command: " + command);
} }
} }
protected Transport configureTransport(Transport transport) { protected Transport configureTransport(Transport transport) {
transport = new ResponseCorrelator(transport); transport = new ResponseCorrelator(transport);
transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
// TODO
//transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
getAcceptListener().onAccept(transport); getAcceptListener().onAccept(transport);
return transport; return transport;
} }
protected TransportSupport createTransport(SocketAddress address) { protected Transport createTransport(Command command, DatagramHeader header) throws IOException {
return new UdpTransportServerClient(this, address); final SocketAddress address = header.getFromAddress();
// TODO lets copy the wireformat...
final UdpTransport transport = new UdpTransport(serverTransport.getWireFormat(), address);
// lets send the packet into the transport so it can track packets
transport.doConsume(command, header);
return new WireFormatNegotiator(transport, serverTransport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) {
// lets use the specific addressing of wire format
protected void sendWireFormat(WireFormatInfo info) throws IOException {
transport.oneway(info, address);
}
};
} }
} }

View File

@ -1,71 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* A logical server side transport instance for a remote client which works with
* the {@link UdpTransportServer}
*
* @version $Revision$
*/
public class UdpTransportServerClient extends TransportSupport {
private static final Log log = LogFactory.getLog(UdpTransportServerClient.class);
private UdpTransportServer server;
private SocketAddress address;
private List queue = Collections.synchronizedList(new LinkedList());
public UdpTransportServerClient(UdpTransportServer server, SocketAddress address) {
this.server = server;
this.address = address;
}
public String toString() {
return "UdpClient@" + address;
}
public void oneway(Command command) throws IOException {
checkStarted(command);
server.sendOutboundCommand(command, address);
}
protected void doStart() throws Exception {
for (Iterator iter = queue.iterator(); iter.hasNext();) {
Command command = (Command) iter.next();
doConsume(command);
iter.remove();
}
}
protected void doStop(ServiceStopper stopper) throws Exception {
queue.clear();
}
}

View File

@ -0,0 +1,55 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
/**
* @version
*/
public class UdpSendReceiveWithTwoConnectionsTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
protected String brokerURI = "udp://localhost:8891";
protected BrokerService broker;
protected void setUp() throws Exception {
broker = createBroker();
broker.start();
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
if (broker != null) {
broker.stop();
}
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(false);
answer.addConnector(brokerURI);
return answer;
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(brokerURI);
}
}

View File

@ -18,10 +18,15 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -29,28 +34,34 @@ import junit.framework.TestCase;
* *
* @version $Revision$ * @version $Revision$
*/ */
public abstract class UdpTestSupport extends TestCase implements TransportListener { public abstract class UdpTestSupport extends TestCase implements TransportListener {
protected abstract Transport createConsumer() throws Exception;
protected abstract Transport createProducer() throws Exception;
protected Transport producer; protected Transport producer;
protected Transport consumer; protected Transport consumer;
protected Object lock = new Object(); protected Object lock = new Object();
protected Command receivedCommand; protected Command receivedCommand;
private TransportServer server;
public void testSendingSmallMessage() throws Exception { public void testSendingSmallMessage() throws Exception {
ConsumerInfo expected = new ConsumerInfo(); ConsumerInfo expected = new ConsumerInfo();
expected.setSelector("Cheese"); expected.setSelector("Cheese");
expected.setExclusive(true);
expected.setCommandId((short) 12);
expected.setExclusive(true);
expected.setPrefetchSize(3456);
try { try {
System.out.println("About to send: " + expected);
producer.oneway(expected); producer.oneway(expected);
Command received = assertCommandReceived(); Command received = assertCommandReceived();
assertTrue("Should have received a ConsumerInfo but was: " + received, received instanceof ConsumerInfo); assertTrue("Should have received a ConsumerInfo but was: " + received, received instanceof ConsumerInfo);
ConsumerInfo actual = (ConsumerInfo) received; ConsumerInfo actual = (ConsumerInfo) received;
assertEquals("Selector", expected.getSelector(), actual.getSelector()); assertEquals("Selector", expected.getSelector(), actual.getSelector());
assertEquals("isExclusive", expected.isExclusive(), actual.isExclusive());
assertEquals("getCommandId", expected.getCommandId(), actual.getCommandId());
assertEquals("getPrefetchSize", expected.getPrefetchSize(), actual.getPrefetchSize());
} }
catch (Exception e) { catch (Exception e) {
System.out.println("Caught: " + e); System.out.println("Caught: " + e);
@ -60,10 +71,34 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
} }
protected void setUp() throws Exception { protected void setUp() throws Exception {
consumer = createConsumer(); server = createServer();
producer = createProducer(); if (server != null) {
server.setAcceptListener(new TransportAcceptListener() {
consumer.setTransportListener(this); public void onAccept(Transport transport) {
consumer = transport;
consumer.setTransportListener(UdpTestSupport.this);
try {
consumer.start();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public void onAcceptError(Exception error) {
}
});
server.start();
}
consumer = createConsumer();
if (consumer != null) {
consumer.setTransportListener(this);
consumer.start();
}
producer = createProducer();
producer.setTransportListener(new TransportListener() { producer.setTransportListener(new TransportListener() {
public void onCommand(Command command) { public void onCommand(Command command) {
} }
@ -78,9 +113,7 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
} }
}); });
consumer.start();
producer.start(); producer.start();
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
@ -90,14 +123,22 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
if (consumer != null) { if (consumer != null) {
consumer.stop(); consumer.stop();
} }
if (server != null) {
server.stop();
}
} }
public void onCommand(Command command) { public void onCommand(Command command) {
System.out.println("### Received command: " + command); if (command instanceof WireFormatInfo) {
System.out.println("Got WireFormatInfo: " + command);
}
else {
System.out.println("### Received command: " + command);
synchronized (lock) { synchronized (lock) {
receivedCommand = command; receivedCommand = command;
lock.notifyAll(); lock.notifyAll();
}
} }
} }
@ -113,7 +154,6 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
System.out.println("### Transport resumed"); System.out.println("### Transport resumed");
} }
protected Command assertCommandReceived() throws InterruptedException { protected Command assertCommandReceived() throws InterruptedException {
Command answer = null; Command answer = null;
synchronized (lock) { synchronized (lock) {
@ -125,4 +165,12 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
return answer; return answer;
} }
protected abstract Transport createConsumer() throws Exception;
protected abstract Transport createProducer() throws Exception;
protected TransportServer createServer() throws Exception {
return null;
}
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
@ -27,17 +28,28 @@ import java.net.URI;
*/ */
public class UdpTransportTest extends UdpTestSupport { public class UdpTransportTest extends UdpTestSupport {
protected String producerURI = "udp://localhost:8830"; protected int consumerPort = 8830;
protected String consumerURI = "udp://localhost:8831?port=8830"; protected String producerURI = "udp://localhost:" + consumerPort;
//protected String producerURI = "udp://localhost:8830";
//protected String consumerURI = "udp://localhost:8831?port=8830";
protected Transport createProducer() throws Exception { protected Transport createProducer() throws Exception {
System.out.println("Producer using URI: " + producerURI); System.out.println("Producer using URI: " + producerURI);
return TransportFactory.connect(new URI(producerURI));
// The WireFormatNegotiator means we can only connect to servers
return new UdpTransport(createWireFormat(), new URI(producerURI));
//return TransportFactory.connect(new URI(producerURI));
} }
protected Transport createConsumer() throws Exception { protected Transport createConsumer() throws Exception {
System.out.println("Consumer using URI: " + consumerURI); System.out.println("Consumer on port: " + consumerPort);
return TransportFactory.connect(new URI(consumerURI)); return new UdpTransport(createWireFormat(), consumerPort);
//return TransportFactory.connect(new URI(consumerURI));
}
protected OpenWireFormat createWireFormat() {
return new OpenWireFormat();
} }
} }

View File

@ -0,0 +1,56 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import java.net.URI;
/**
*
* @version $Revision$
*/
public class UdpTransportUsingServerTest extends UdpTestSupport {
protected int consumerPort = 8830;
protected String producerURI = "udp://localhost:" + consumerPort;
protected String serverURI = producerURI;
protected Transport createProducer() throws Exception {
System.out.println("Producer using URI: " + producerURI);
return TransportFactory.connect(new URI(producerURI));
}
protected TransportServer createServer() throws Exception {
return TransportFactory.bind("byBroker", new URI(serverURI));
}
protected Transport createConsumer() throws Exception {
return null;
}
protected OpenWireFormat createWireFormat() {
OpenWireFormat answer = new OpenWireFormat();
answer.setCacheEnabled(false);
answer.setSizePrefixDisabled(true);
return answer;
}
}