refactor of the UDP transport to enable the ReliableTransport by default

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@385826 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-14 16:34:34 +00:00
parent 72b382d0bd
commit 364337b9aa
18 changed files with 207 additions and 226 deletions

View File

@ -328,6 +328,7 @@ public class BrokerService implements Service {
return; return;
} }
try {
processHelperProperties(); processHelperProperties();
BrokerRegistry.getInstance().bind(getBrokerName(), this); BrokerRegistry.getInstance().bind(getBrokerName(), this);
@ -354,6 +355,11 @@ public class BrokerService implements Service {
log.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ") started"); log.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ") started");
} }
catch (Exception e) {
log.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
throw e;
}
}
public void stop() throws Exception { public void stop() throws Exception {
if (! started.compareAndSet(true, false)) { if (! started.compareAndSet(true, false)) {

View File

@ -50,10 +50,6 @@ public class LastPartialCommand extends BaseCommand {
* @param completeCommand the newly unmarshalled complete command * @param completeCommand the newly unmarshalled complete command
*/ */
public void configure(Command completeCommand) { public void configure(Command completeCommand) {
// overwrite the commandId as the numbers change when we introduce
// fragmentation commands
completeCommand.setCommandId(getCommandId());
// copy across the transient properties // copy across the transient properties
completeCommand.setFrom(getFrom()); completeCommand.setFrom(getFrom());

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -37,30 +38,25 @@ public class ResponseCorrelator extends TransportFilter {
private static final Log log = LogFactory.getLog(ResponseCorrelator.class); private static final Log log = LogFactory.getLog(ResponseCorrelator.class);
private final ConcurrentHashMap requestMap = new ConcurrentHashMap(); private final ConcurrentHashMap requestMap = new ConcurrentHashMap();
private int lastCommandId = 0; private IntSequenceGenerator sequenceGenerator;
public synchronized int getNextCommandId() {
return ++lastCommandId;
}
public ResponseCorrelator(Transport next) { public ResponseCorrelator(Transport next) {
this(next, new IntSequenceGenerator());
}
public ResponseCorrelator(Transport next, IntSequenceGenerator sequenceGenerator) {
super(next); super(next);
this.sequenceGenerator = sequenceGenerator;
} }
public void oneway(Command command) throws IOException { public void oneway(Command command) throws IOException {
// a parent transport could have set the ID command.setCommandId(sequenceGenerator.getNextSequenceId());
if (command.getCommandId() == 0) {
command.setCommandId(getNextCommandId());
}
command.setResponseRequired(false); command.setResponseRequired(false);
next.oneway(command); next.oneway(command);
} }
public FutureResponse asyncRequest(Command command) throws IOException { public FutureResponse asyncRequest(Command command) throws IOException {
// a parent transport could have set the ID command.setCommandId(sequenceGenerator.getNextSequenceId());
if (command.getCommandId() == 0) {
command.setCommandId(getNextCommandId());
}
command.setResponseRequired(true); command.setResponseRequired(true);
FutureResponse future = new FutureResponse(); FutureResponse future = new FutureResponse();
requestMap.put(new Integer(command.getCommandId()), future); requestMap.put(new Integer(command.getCommandId()), future);
@ -93,6 +89,10 @@ public class ResponseCorrelator extends TransportFilter {
} }
} }
public IntSequenceGenerator getSequenceGenerator() {
return sequenceGenerator;
}
public String toString() { public String toString() {
return next.toString(); return next.toString();
} }

View File

@ -22,6 +22,7 @@ import org.apache.activemq.openwire.CommandIdComparator;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -48,6 +49,12 @@ public class ReliableTransport extends ResponseCorrelator {
this.replayStrategy = replayStrategy; this.replayStrategy = replayStrategy;
} }
public ReliableTransport(Transport next, IntSequenceGenerator sequenceGenerator, ReplayStrategy replayStrategy) {
super(next, sequenceGenerator);
this.replayStrategy = replayStrategy;
}
public Response request(Command command) throws IOException { public Response request(Command command) throws IOException {
FutureResponse response = asyncRequest(command); FutureResponse response = asyncRequest(command);
while (true) { while (true) {
@ -155,14 +162,31 @@ public class ReliableTransport extends ResponseCorrelator {
this.expectedCounter = expectedCounter; this.expectedCounter = expectedCounter;
} }
public String toString() {
return next.toString(); public int getRequestTimeout() {
return requestTimeout;
}
/**
* Sets the default timeout of requests before starting to request commands are replayed
*/
public void setRequestTimeout(int requestTimeout) {
this.requestTimeout = requestTimeout;
} }
public ReplayStrategy getReplayStrategy() {
return replayStrategy;
}
public String toString() {
return next.toString();
}
/** /**
* Lets attempt to replay the request as a command may have disappeared * Lets attempt to replay the request as a command may have disappeared
*/ */
protected void replayRequest(Command command, FutureResponse response) { protected void replayRequest(Command command, FutureResponse response) {
log.debug("Still waiting for response on: " + this + " to command: " + command);
} }
} }

View File

@ -16,14 +16,11 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import edu.emory.mathcs.backport.java.util.concurrent.Future;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Map;
/** /**
* *
@ -33,7 +30,7 @@ public interface CommandChannel extends Service {
public abstract Command read() throws IOException; public abstract Command read() throws IOException;
public abstract void write(Command command, SocketAddress address, Map requestMap, Future future) throws IOException; public abstract void write(Command command, SocketAddress address) throws IOException;
public abstract int getDatagramSize(); public abstract int getDatagramSize();

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import edu.emory.mathcs.backport.java.util.concurrent.Future;
import org.activeio.ByteArrayInputStream; import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream; import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
@ -26,6 +24,7 @@ import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand; import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -35,7 +34,6 @@ import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.util.Map;
/** /**
* A strategy for reading datagrams and de-fragmenting them together. * A strategy for reading datagrams and de-fragmenting them together.
@ -46,8 +44,8 @@ public class CommandDatagramChannel implements CommandChannel {
private static final Log log = LogFactory.getLog(CommandDatagramChannel.class); private static final Log log = LogFactory.getLog(CommandDatagramChannel.class);
private final UdpTransport transport;
private final String name; private final String name;
private final IntSequenceGenerator sequenceGenerator;
private DatagramChannel channel; private DatagramChannel channel;
private OpenWireFormat wireFormat; private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool; private ByteBufferPool bufferPool;
@ -65,9 +63,9 @@ public class CommandDatagramChannel implements CommandChannel {
private int defaultMarshalBufferSize = 64 * 1024; private int defaultMarshalBufferSize = 64 * 1024;
public CommandDatagramChannel(UdpTransport transport, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, public CommandDatagramChannel(UdpTransport transport, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) { SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
this.transport = transport;
this.channel = channel; this.channel = channel;
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
@ -75,6 +73,10 @@ public class CommandDatagramChannel implements CommandChannel {
this.targetAddress = targetAddress; this.targetAddress = targetAddress;
this.headerMarshaller = headerMarshaller; this.headerMarshaller = headerMarshaller;
this.name = transport.toString(); this.name = transport.toString();
this.sequenceGenerator = transport.getSequenceGenerator();
if (sequenceGenerator == null) {
throw new IllegalArgumentException("No sequenceGenerator on the given transport: " + transport);
}
} }
public String toString() { public String toString() {
@ -130,12 +132,9 @@ public class CommandDatagramChannel implements CommandChannel {
return answer; return answer;
} }
public void write(Command command, SocketAddress address, Map requestMap, Future future) throws IOException { public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) { synchronized (writeLock) {
if (!command.isWireFormatInfo() && command.getCommandId() == 0) {
command.setCommandId(transport.getNextCommandId());
}
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer)); wireFormat.marshal(command, new DataOutputStream(largeBuffer));
byte[] data = largeBuffer.toByteArray(); byte[] data = largeBuffer.toByteArray();
@ -202,7 +201,7 @@ public class CommandDatagramChannel implements CommandChannel {
int commandId = command.getCommandId(); int commandId = command.getCommandId();
if (fragment > 0) { if (fragment > 0) {
commandId = transport.getNextCommandId(); commandId = sequenceGenerator.getNextSequenceId();
} }
writeBuffer.putInt(commandId); writeBuffer.putInt(commandId);
if (bs == null) { if (bs == null) {
@ -216,12 +215,12 @@ public class CommandDatagramChannel implements CommandChannel {
writeBuffer.put(data, offset, chunkSize); writeBuffer.put(data, offset, chunkSize);
offset += chunkSize; offset += chunkSize;
sendWriteBuffer(address); sendWriteBuffer(address, commandId);
} }
// now lets write the last partial command // now lets write the last partial command
command = new LastPartialCommand(command.isResponseRequired()); command = new LastPartialCommand(command.isResponseRequired());
command.setCommandId(transport.getNextCommandId()); command.setCommandId(sequenceGenerator.getNextSequenceId());
largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer)); wireFormat.marshal(command, new DataOutputStream(largeBuffer));
@ -232,11 +231,7 @@ public class CommandDatagramChannel implements CommandChannel {
} }
writeBuffer.put(data); writeBuffer.put(data);
sendWriteBuffer(address, command.getCommandId());
if (command.isResponseRequired()) {
requestMap.put(new Integer(command.getCommandId()), future);
}
sendWriteBuffer(address);
} }
} }
@ -281,11 +276,11 @@ public class CommandDatagramChannel implements CommandChannel {
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected void sendWriteBuffer(SocketAddress address) throws IOException { protected void sendWriteBuffer(SocketAddress address, int commandId) throws IOException {
writeBuffer.flip(); writeBuffer.flip();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " sending datagram to: " + address); log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address);
} }
channel.send(writeBuffer, address); channel.send(writeBuffer, address);
} }

View File

@ -16,9 +16,6 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.Future;
import org.activeio.ByteArrayInputStream; import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream; import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
@ -27,8 +24,7 @@ import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand; import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -40,7 +36,6 @@ import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Map;
/** /**
* A strategy for reading datagrams and de-fragmenting them together. * A strategy for reading datagrams and de-fragmenting them together.
@ -51,7 +46,6 @@ public class CommandDatagramSocket implements CommandChannel {
private static final Log log = LogFactory.getLog(CommandDatagramSocket.class); private static final Log log = LogFactory.getLog(CommandDatagramSocket.class);
private final UdpTransport transport;
private final String name; private final String name;
private DatagramSocket channel; private DatagramSocket channel;
private InetAddress targetAddress; private InetAddress targetAddress;
@ -59,16 +53,12 @@ public class CommandDatagramSocket implements CommandChannel {
private OpenWireFormat wireFormat; private OpenWireFormat wireFormat;
private int datagramSize = 4 * 1024; private int datagramSize = 4 * 1024;
private DatagramHeaderMarshaller headerMarshaller; private DatagramHeaderMarshaller headerMarshaller;
private IntSequenceGenerator sequenceGenerator;
// reading
private Object readLock = new Object(); private Object readLock = new Object();
// writing
private Object writeLock = new Object(); private Object writeLock = new Object();
public CommandDatagramSocket(UdpTransport transport, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress, public CommandDatagramSocket(UdpTransport transport, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress,
int targetPort, DatagramHeaderMarshaller headerMarshaller) { int targetPort, DatagramHeaderMarshaller headerMarshaller) {
this.transport = transport;
this.channel = channel; this.channel = channel;
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.datagramSize = datagramSize; this.datagramSize = datagramSize;
@ -76,6 +66,10 @@ public class CommandDatagramSocket implements CommandChannel {
this.targetPort = targetPort; this.targetPort = targetPort;
this.headerMarshaller = headerMarshaller; this.headerMarshaller = headerMarshaller;
this.name = transport.toString(); this.name = transport.toString();
this.sequenceGenerator = transport.getSequenceGenerator();
if (sequenceGenerator == null) {
throw new IllegalArgumentException("No sequenceGenerator on the given transport: " + transport);
}
} }
public String toString() { public String toString() {
@ -115,15 +109,14 @@ public class CommandDatagramSocket implements CommandChannel {
return answer; return answer;
} }
public void write(Command command, SocketAddress address, Map requestMap, Future future) throws IOException { public void write(Command command, SocketAddress address) throws IOException {
InetSocketAddress ia = (InetSocketAddress) address; InetSocketAddress ia = (InetSocketAddress) address;
write(command, ia.getAddress(), ia.getPort(), requestMap, future); write(command, ia.getAddress(), ia.getPort());
} }
public void write(Command command, InetAddress address, int port, Map requestMap, Future future) throws IOException { public void write(Command command, InetAddress address, int port) throws IOException {
synchronized (writeLock) { synchronized (writeLock) {
command.setCommandId(transport.getNextCommandId());
ByteArrayOutputStream writeBuffer = createByteArrayOutputStream(); ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(writeBuffer); DataOutputStream dataOut = new DataOutputStream(writeBuffer);
headerMarshaller.writeHeader(command, dataOut); headerMarshaller.writeHeader(command, dataOut);
@ -133,10 +126,7 @@ public class CommandDatagramSocket implements CommandChannel {
wireFormat.marshal(command, dataOut); wireFormat.marshal(command, dataOut);
if (remaining(writeBuffer) >= 0) { if (remaining(writeBuffer) >= 0) {
if (command.isResponseRequired()) { sendWriteBuffer(address, port, writeBuffer, command.getCommandId());
requestMap.put(new Integer(command.getCommandId()), future);
}
sendWriteBuffer(address, port, writeBuffer);
} }
else { else {
// lets split the command up into chunks // lets split the command up into chunks
@ -193,7 +183,7 @@ public class CommandDatagramSocket implements CommandChannel {
int commandId = command.getCommandId(); int commandId = command.getCommandId();
if (fragment > 0) { if (fragment > 0) {
commandId = transport.getNextCommandId(); commandId = sequenceGenerator.getNextSequenceId();
} }
dataOut.writeInt(commandId); dataOut.writeInt(commandId);
if (bs == null) { if (bs == null) {
@ -207,21 +197,18 @@ public class CommandDatagramSocket implements CommandChannel {
dataOut.write(data, offset, chunkSize); dataOut.write(data, offset, chunkSize);
offset += chunkSize; offset += chunkSize;
sendWriteBuffer(address, port, writeBuffer); sendWriteBuffer(address, port, writeBuffer, commandId);
} }
// now lets write the last partial command // now lets write the last partial command
command = new LastPartialCommand(command.isResponseRequired()); command = new LastPartialCommand(command.isResponseRequired());
command.setCommandId(transport.getNextCommandId()); command.setCommandId(sequenceGenerator.getNextSequenceId());
writeBuffer.reset(); writeBuffer.reset();
headerMarshaller.writeHeader(command, dataOut); headerMarshaller.writeHeader(command, dataOut);
wireFormat.marshal(command, dataOut); wireFormat.marshal(command, dataOut);
if (command.isResponseRequired()) { sendWriteBuffer(address, port, writeBuffer, command.getCommandId());
requestMap.put(new Integer(command.getCommandId()), future);
}
sendWriteBuffer(address, port, writeBuffer);
} }
} }
} }
@ -266,9 +253,9 @@ public class CommandDatagramSocket implements CommandChannel {
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer) throws IOException { protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " sending datagram to: " + address); log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address);
} }
byte[] data = writeBuffer.toByteArray(); byte[] data = writeBuffer.toByteArray();
DatagramPacket packet = new DatagramPacket(data, 0, data.length, address, port); DatagramPacket packet = new DatagramPacket(data, 0, data.length, address, port);

View File

@ -18,7 +18,6 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.BaseEndpoint; import org.apache.activemq.command.BaseEndpoint;
import java.net.InetAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
/** /**

View File

@ -17,14 +17,12 @@
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.Endpoint;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;

View File

@ -16,20 +16,15 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.Response;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportThreadSupport; import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReplayStrategy; import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -69,10 +64,8 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private int port; private int port;
private int minmumWireFormatVersion; private int minmumWireFormatVersion;
private String description = null; private String description = null;
private final ConcurrentHashMap requestMap = new ConcurrentHashMap();
private int lastCommandId = 0;
private Runnable runnable; private Runnable runnable;
private IntSequenceGenerator sequenceGenerator;
protected UdpTransport(OpenWireFormat wireFormat) throws IOException { protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
@ -100,28 +93,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.description = getProtocolName() + "Server@"; this.description = getProtocolName() + "Server@";
} }
public TransportFilter createFilter(Transport transport) {
return new TransportFilter(transport) {
public void onCommand(Command command) {
boolean debug = log.isDebugEnabled();
if (command.isResponse()) {
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
if (future != null) {
future.set(response);
}
else {
if (debug)
log.debug("Received unexpected response for command id: " + response.getCorrelationId());
}
}
else {
super.onCommand(command);
}
}
};
}
/** /**
* A one way asynchronous send * A one way asynchronous send
*/ */
@ -130,48 +101,16 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
} }
/** /**
* A one way asynchronous send * A one way asynchronous send to a given address
*/ */
public void oneway(Command command, FutureResponse future) throws IOException { public void oneway(Command command, SocketAddress address) throws IOException {
oneway(command, targetAddress, future);
}
protected void oneway(Command command, SocketAddress address, FutureResponse future) throws IOException {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command); log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
} }
checkStarted(command); checkStarted(command);
commandChannel.write(command, address, requestMap, future); commandChannel.write(command, address);
} }
/**
* A one way asynchronous send to a given address
*/
public void oneway(Command command, SocketAddress address) throws IOException {
oneway(command, address, null);
}
public FutureResponse asyncRequest(Command command) throws IOException {
if (command.getCommandId() == 0) {
command.setCommandId(getNextCommandId());
}
command.setResponseRequired(true);
FutureResponse future = new FutureResponse();
oneway(command, future);
return future;
}
public Response request(Command command) throws IOException {
FutureResponse response = asyncRequest(command);
return response.getResult();
}
public Response request(Command command, int timeout) throws IOException {
FutureResponse response = asyncRequest(command);
return response.getResult(timeout);
}
public void setStartupRunnable(Runnable runnable) { public void setStartupRunnable(Runnable runnable) {
this.runnable = runnable; this.runnable = runnable;
} }
@ -363,6 +302,15 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.checkSequenceNumbers = checkSequenceNumbers; this.checkSequenceNumbers = checkSequenceNumbers;
} }
public IntSequenceGenerator getSequenceGenerator() {
return sequenceGenerator;
}
public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) {
this.sequenceGenerator = sequenceGenerator;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -451,8 +399,4 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
protected SocketAddress getTargetAddress() { protected SocketAddress getTargetAddress() {
return targetAddress; return targetAddress;
} }
protected synchronized int getNextCommandId() {
return ++lastCommandId;
}
} }

View File

@ -19,21 +19,19 @@ package org.apache.activemq.transport.udp;
import org.activeio.command.WireFormat; import org.activeio.command.WireFormat;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner; import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReliableTransport; import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy; import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
@ -58,7 +56,15 @@ public class UdpTransportFactory extends TransportFactory {
UdpTransport transport = new UdpTransport(openWireFormat, port); UdpTransport transport = new UdpTransport(openWireFormat, port);
Transport configuredTransport = configure(transport, wf, options, true); Transport configuredTransport = configure(transport, wf, options, true);
UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport); ReplayStrategy replayStrategy = null;
if (configuredTransport instanceof ReliableTransport) {
ReliableTransport rt = (ReliableTransport) configuredTransport;
replayStrategy = rt.getReplayStrategy();
}
if (replayStrategy == null) {
replayStrategy = createReplayStrategy();
}
UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, replayStrategy);
return server; return server;
} }
catch (URISyntaxException e) { catch (URISyntaxException e) {
@ -92,8 +98,6 @@ public class UdpTransportFactory extends TransportFactory {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration()); transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
} }
// TODO should we have this?
//transport = udpTransport.createFilter(transport);
return transport; return transport;
} }
@ -119,16 +123,24 @@ public class UdpTransportFactory extends TransportFactory {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration()); transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
} }
// add reliabilty
//transport = new ReliableTransport(transport, createReplayStrategy());
// deal with fragmentation // deal with fragmentation
if (server) {
// we don't want to do reliable checks on this transport as we
// delegate to one that does
transport = new CommandJoiner(transport, openWireFormat); transport = new CommandJoiner(transport, openWireFormat);
udpTransport.setSequenceGenerator(new IntSequenceGenerator());
transport = udpTransport.createFilter(transport);
return transport; return transport;
} }
else {
ReliableTransport reliableTransport = new ReliableTransport(transport, createReplayStrategy());
udpTransport.setSequenceGenerator(reliableTransport.getSequenceGenerator());
// Joiner must be on outside as the inbound messages must be
// processed by the reliable transport first
return new CommandJoiner(reliableTransport, openWireFormat);
}
}
protected ReplayStrategy createReplayStrategy() { protected ReplayStrategy createReplayStrategy() {
return new ExceptionIfDroppedReplayStrategy(1); return new ExceptionIfDroppedReplayStrategy(1);
@ -147,17 +159,14 @@ public class UdpTransportFactory extends TransportFactory {
}; };
/* /*
transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) { * transport = new WireFormatNegotiator(transport,
protected void onWireFormatNegotiated(WireFormatInfo info) { * asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {
// lets switch to the target endpoint * protected void onWireFormatNegotiated(WireFormatInfo info) { // lets
// based on the last packet that was received * switch to the target endpoint // based on the last packet that was
// so that all future requests go to the newly created UDP channel * received // so that all future requests go to the newly created UDP
Endpoint from = info.getFrom(); * channel Endpoint from = info.getFrom();
System.out.println("####Êsetting the client side target to: " + from); * System.out.println("####Êsetting the client side target to: " +
udpTransport.setTargetEndpoint(from); * from); udpTransport.setTargetEndpoint(from); } }; return transport;
}
};
return transport;
*/ */
} }

View File

@ -18,17 +18,15 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner; import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerSupport; import org.apache.activemq.transport.TransportServerSupport;
import org.apache.activemq.transport.WireFormatNegotiator; import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -49,15 +47,17 @@ public class UdpTransportServer extends TransportServerSupport {
private static final Log log = LogFactory.getLog(UdpTransportServer.class); private static final Log log = LogFactory.getLog(UdpTransportServer.class);
private UdpTransport serverTransport; private UdpTransport serverTransport;
private ReplayStrategy replayStrategy;
private Transport configuredTransport; private Transport configuredTransport;
private boolean usingWireFormatNegotiation; private boolean usingWireFormatNegotiation;
private Map transports = new HashMap(); private Map transports = new HashMap();
public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport) {
public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
super(connectURI); super(connectURI);
this.serverTransport = serverTransport; this.serverTransport = serverTransport;
this.configuredTransport = configuredTransport; this.configuredTransport = configuredTransport;
this.replayStrategy = replayStrategy;
// lets disable the incremental checking of the sequence numbers // lets disable the incremental checking of the sequence numbers
// as we are getting messages from many different clients // as we are getting messages from many different clients
@ -137,8 +137,6 @@ public class UdpTransportServer extends TransportServerSupport {
} }
protected Transport configureTransport(Transport transport) { protected Transport configureTransport(Transport transport) {
// transport = new ResponseCorrelator(transport);
if (serverTransport.getMaxInactivityDuration() > 0) { if (serverTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration()); transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
} }
@ -149,28 +147,26 @@ public class UdpTransportServer extends TransportServerSupport {
protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException { protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException {
if (endpoint == null) { if (endpoint == null) {
//log.error("No endpoint available for command: " + command);
throw new IOException("No endpoint available for command: " + command); throw new IOException("No endpoint available for command: " + command);
} }
final SocketAddress address = endpoint.getAddress(); final SocketAddress address = endpoint.getAddress();
final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy(); final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
final UdpTransport transport = new UdpTransport(connectionWireFormat, address); final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
Transport configuredTransport = new CommandJoiner(transport, connectionWireFormat); final ReliableTransport reliableTransport = new ReliableTransport(transport, replayStrategy);
transport.setSequenceGenerator(reliableTransport.getSequenceGenerator());
// lets pass in the received transport // Joiner must be on outside as the inbound messages must be processed by the reliable transport first
return new TransportFilter(configuredTransport) { return new CommandJoiner(reliableTransport, connectionWireFormat) {
public void start() throws Exception { public void start() throws Exception {
super.start(); super.start();
onCommand(command); reliableTransport.onCommand(command);
} }
}; };
/** /**
// return configuredTransport;
// configuredTransport = transport.createFilter(configuredTransport);
final WireFormatNegotiator wireFormatNegotiator = new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport final WireFormatNegotiator wireFormatNegotiator = new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport
.getMinmumWireFormatVersion()) { .getMinmumWireFormatVersion()) {
@ -188,14 +184,5 @@ public class UdpTransportServer extends TransportServerSupport {
}; };
return wireFormatNegotiator; return wireFormatNegotiator;
*/ */
/*
* transport.setStartupRunnable(new Runnable() {
*
* public void run() { System.out.println("Passing the incoming
* WireFormat into into: " + this);
* // process the inbound wireformat
* wireFormatNegotiator.onCommand(command); }});
*/
} }
} }

View File

@ -0,0 +1,34 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
public class IntSequenceGenerator {
private int lastSequenceId;
public synchronized int getNextSequenceId() {
return ++lastSequenceId;
}
public synchronized int getLastSequenceId() {
return lastSequenceId;
}
public synchronized void setLastSequenceId(int l) {
lastSequenceId = l;
}
}

View File

@ -21,6 +21,7 @@ import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.udp.UdpTransport; import org.apache.activemq.transport.udp.UdpTransport;
import org.apache.activemq.transport.udp.UdpTransportTest; import org.apache.activemq.transport.udp.UdpTransportTest;
import org.apache.activemq.util.IntSequenceGenerator;
import java.net.URI; import java.net.URI;
@ -42,12 +43,14 @@ public class MulticastTransportTest extends UdpTransportTest {
OpenWireFormat wireFormat = createWireFormat(); OpenWireFormat wireFormat = createWireFormat();
MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI)); MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI));
transport.setSequenceGenerator(new IntSequenceGenerator());
return new CommandJoiner(transport, wireFormat); return new CommandJoiner(transport, wireFormat);
} }
protected Transport createConsumer() throws Exception { protected Transport createConsumer() throws Exception {
OpenWireFormat wireFormat = createWireFormat(); OpenWireFormat wireFormat = createWireFormat();
MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI)); MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI));
transport.setSequenceGenerator(new IntSequenceGenerator());
return new CommandJoiner(transport, wireFormat); return new CommandJoiner(transport, wireFormat);
} }

View File

@ -24,7 +24,7 @@ public class UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest extends UdpSen
protected void setUp() throws Exception { protected void setUp() throws Exception {
largeMessages = true; largeMessages = true;
//messageCount = 1; //messageCount = 2;
super.setUp(); super.setUp();
} }

View File

@ -45,6 +45,7 @@ public class UdpSendReceiveWithTwoConnectionsTest extends JmsTopicSendReceiveWit
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService(); BrokerService answer = new BrokerService();
answer.setPersistent(false); answer.setPersistent(false);
answer.setUseJmx(false);
answer.addConnector(brokerURI); answer.addConnector(brokerURI);
return answer; return answer;
} }

View File

@ -54,7 +54,6 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
ConsumerInfo expected = new ConsumerInfo(); ConsumerInfo expected = new ConsumerInfo();
expected.setSelector("Cheese"); expected.setSelector("Cheese");
expected.setExclusive(true); expected.setExclusive(true);
expected.setCommandId((short) 12);
expected.setExclusive(true); expected.setExclusive(true);
expected.setPrefetchSize(3456); expected.setPrefetchSize(3456);
@ -67,7 +66,6 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
ConsumerInfo actual = (ConsumerInfo) received; ConsumerInfo actual = (ConsumerInfo) received;
assertEquals("Selector", expected.getSelector(), actual.getSelector()); assertEquals("Selector", expected.getSelector(), actual.getSelector());
assertEquals("isExclusive", expected.isExclusive(), actual.isExclusive()); assertEquals("isExclusive", expected.isExclusive(), actual.isExclusive());
assertEquals("getCommandId", expected.getCommandId(), actual.getCommandId());
assertEquals("getPrefetchSize", expected.getPrefetchSize(), actual.getPrefetchSize()); assertEquals("getPrefetchSize", expected.getPrefetchSize(), actual.getPrefetchSize());
} }
catch (Exception e) { catch (Exception e) {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner; import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IntSequenceGenerator;
import java.net.URI; import java.net.URI;
@ -40,6 +41,7 @@ public class UdpTransportTest extends UdpTestSupport {
OpenWireFormat wireFormat = createWireFormat(); OpenWireFormat wireFormat = createWireFormat();
UdpTransport transport = new UdpTransport(wireFormat, new URI(producerURI)); UdpTransport transport = new UdpTransport(wireFormat, new URI(producerURI));
transport.setSequenceGenerator(new IntSequenceGenerator());
return new CommandJoiner(transport, wireFormat); return new CommandJoiner(transport, wireFormat);
} }
@ -47,6 +49,7 @@ public class UdpTransportTest extends UdpTestSupport {
System.out.println("Consumer on port: " + consumerPort); System.out.println("Consumer on port: " + consumerPort);
OpenWireFormat wireFormat = createWireFormat(); OpenWireFormat wireFormat = createWireFormat();
UdpTransport transport = new UdpTransport(wireFormat, consumerPort); UdpTransport transport = new UdpTransport(wireFormat, consumerPort);
transport.setSequenceGenerator(new IntSequenceGenerator());
return new CommandJoiner(transport, wireFormat); return new CommandJoiner(transport, wireFormat);
} }