diff --git a/activemq-core/project.xml b/activemq-core/project.xml
index f307b11478..48b20b9feb 100755
--- a/activemq-core/project.xml
+++ b/activemq-core/project.xml
@@ -360,10 +360,6 @@
**/MultipleTestsWithSpringFactoryBeanTest.*
**/MultipleTestsWithXBeanFactoryBeanTest.*
**/MultipleTestsWithSpringXBeanFactoryBeanTest.*
-
-
- **/UdpSendReceiveWithTwoConnectionsTest.*
- **/MulticastTransportTest.*
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java b/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java
index 2c90ab2720..b577c17083 100644
--- a/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/BaseEndpoint.java
@@ -33,4 +33,9 @@ public class BaseEndpoint implements Endpoint {
return name;
}
+ public String toString() {
+ return "Endpoint[" + name + "]";
+ }
+
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java
index 518fc64450..f712239b09 100644
--- a/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java
@@ -31,9 +31,8 @@ public class LastPartialCommand extends BaseCommand {
public LastPartialCommand() {
}
- public LastPartialCommand(Command command) {
- setCommandId(command.getCommandId());
- setResponseRequired(command.isResponseRequired());
+ public LastPartialCommand(boolean responseRequired) {
+ setResponseRequired(responseRequired);
}
public byte getDataStructureType() {
@@ -44,4 +43,22 @@ public class LastPartialCommand extends BaseCommand {
throw new IllegalStateException("The transport layer should filter out LastPartialCommand instances but received: " + this);
}
+ /**
+ * Lets copy across the required fields from this last partial command to
+ * the newly unmarshalled complete command
+ *
+ * @param completeCommand the newly unmarshalled complete command
+ */
+ public void configure(Command completeCommand) {
+ // overwrite the commandId as the numbers change when we introduce
+ // fragmentation commands
+ completeCommand.setCommandId(getCommandId());
+
+ // copy across the transient properties
+ completeCommand.setFrom(getFrom());
+
+ // TODO should not be required as the large command would be marshalled with this property
+ //completeCommand.setResponseRequired(isResponseRequired());
+ }
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
index c098c3b936..1db19a5c26 100644
--- a/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
@@ -128,4 +128,10 @@ public class PartialCommand implements Command {
public boolean isMarshallAware() {
return false;
}
+
+ public String toString() {
+ return "PartialCommand[id: " + commandId + " data: " + data.length + " byte(s)]";
+ }
+
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java b/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java
index 9eb6df7c40..ba462bbbd9 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java
@@ -21,6 +21,8 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -33,6 +35,7 @@ import java.io.IOException;
* @version $Revision$
*/
public class CommandJoiner extends TransportFilter {
+ private static final Log log = LogFactory.getLog(CommandJoiner.class);
private ByteArrayOutputStream out = new ByteArrayOutputStream();
private OpenWireFormat wireFormat;
@@ -57,13 +60,16 @@ public class CommandJoiner extends TransportFilter {
else if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
try {
byte[] fullData = out.toByteArray();
+ out.reset();
Command completeCommand = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(fullData)));
- completeCommand.setCommandId(command.getCommandId());
- completeCommand.setResponseRequired(command.isResponseRequired());
- resetBuffer();
+
+ LastPartialCommand lastCommand = (LastPartialCommand) command;
+ lastCommand.configure(completeCommand);
+
getTransportListener().onCommand(completeCommand);
}
catch (IOException e) {
+ log.warn("Failed to unmarshal partial command: " + command);
getTransportListener().onException(e);
}
}
@@ -74,14 +80,10 @@ public class CommandJoiner extends TransportFilter {
public void stop() throws Exception {
super.stop();
- resetBuffer();
+ out = null;
}
public String toString() {
return next.toString();
}
-
- protected void resetBuffer() {
- out.reset();
- }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
index 539aa511e0..91121f432e 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
@@ -32,14 +32,14 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
*
* @version $Revision: 1.4 $
*/
-final public class ResponseCorrelator extends TransportFilter {
+public class ResponseCorrelator extends TransportFilter {
private static final Log log = LogFactory.getLog(ResponseCorrelator.class);
private final ConcurrentHashMap requestMap = new ConcurrentHashMap();
private int lastCommandId = 0;
- synchronized int getNextCommandId() {
+ public synchronized int getNextCommandId() {
return ++lastCommandId;
}
@@ -48,13 +48,19 @@ final public class ResponseCorrelator extends TransportFilter {
}
public void oneway(Command command) throws IOException {
- command.setCommandId(getNextCommandId());
+ // a parent transport could have set the ID
+ if (command.getCommandId() == 0) {
+ command.setCommandId(getNextCommandId());
+ }
command.setResponseRequired(false);
next.oneway(command);
}
public FutureResponse asyncRequest(Command command) throws IOException {
- command.setCommandId(getNextCommandId());
+ // a parent transport could have set the ID
+ if (command.getCommandId() == 0) {
+ command.setCommandId(getNextCommandId());
+ }
command.setResponseRequired(true);
FutureResponse future = new FutureResponse();
requestMap.put(new Integer(command.getCommandId()), future);
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
index a2ee6b28bf..0ab2ec7d97 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
@@ -88,15 +88,15 @@ public class WireFormatNegotiator extends TransportFilter {
try {
wireInfoSentDownLatch.await();
+ if (log.isDebugEnabled()) {
+ log.debug(this + " before negotiation: " + wireFormat);
+ }
if( !info.isValid() ) {
onException(new IOException("Remote wire format magic is invalid"));
} else if( info.getVersion() < minimumVersion ) {
onException(new IOException("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")"));
}
- if (log.isDebugEnabled()) {
- log.debug(this + " before negotiation: " + wireFormat);
- }
wireFormat.renegociatWireFormat(info);
if (log.isDebugEnabled()) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
index e88b8ecf94..c12c1fc602 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
@@ -93,7 +93,7 @@ public class MulticastTransport extends UdpTransport {
socket.joinGroup(mcastAddress);
socket.setSoTimeout((int) keepAliveInterval);
- return new CommandDatagramSocket(toString(), socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller());
+ return new CommandDatagramSocket(this, socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller());
}
protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
index 7649c1c514..27038136c9 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
@@ -28,13 +28,20 @@ public class ExceptionIfDroppedReplayStrategy implements ReplayStrategy {
private int maximumDifference = 5;
+ public ExceptionIfDroppedReplayStrategy() {
+ }
+
+ public ExceptionIfDroppedReplayStrategy(int maximumDifference) {
+ this.maximumDifference = maximumDifference;
+ }
+
public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
int difference = actualCounter - expectedCounter;
long count = Math.abs(difference);
if (count > maximumDifference) {
throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter);
}
-
+
// lets discard old commands
return difference > 0;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
index 72db68dfec..7a258c26fa 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
@@ -17,9 +17,13 @@
package org.apache.activemq.transport.reliable;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Response;
import org.apache.activemq.openwire.CommandIdComparator;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFilter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.SortedSet;
@@ -31,17 +35,54 @@ import java.util.TreeSet;
*
* @version $Revision$
*/
-public class ReliableTransport extends TransportFilter {
+public class ReliableTransport extends ResponseCorrelator {
+ private static final Log log = LogFactory.getLog(ReliableTransport.class);
+
private ReplayStrategy replayStrategy;
private SortedSet commands = new TreeSet(new CommandIdComparator());
private int expectedCounter = 1;
+ private int requestTimeout = 2000;
public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
super(next);
this.replayStrategy = replayStrategy;
}
+ public Response request(Command command) throws IOException {
+ FutureResponse response = asyncRequest(command);
+ while (true) {
+ Response result = response.getResult(requestTimeout);
+ if (result != null) {
+ return result;
+ }
+ replayRequest(command, response);
+ }
+ }
+
+ public Response request(Command command, int timeout) throws IOException {
+ FutureResponse response = asyncRequest(command);
+ while (timeout > 0) {
+ int time = timeout;
+ if (timeout > requestTimeout) {
+ time = requestTimeout;
+ }
+ Response result = response.getResult(time);
+ if (result != null) {
+ return result;
+ }
+ replayRequest(command, response);
+ timeout -= time;
+ }
+ return response.getResult(0);
+ }
+
public void onCommand(Command command) {
+ // lets pass wireformat through
+ if (command.isWireFormatInfo()) {
+ super.onCommand(command);
+ return;
+ }
+
int actualCounter = command.getCommandId();
boolean valid = expectedCounter == actualCounter;
@@ -49,9 +90,12 @@ public class ReliableTransport extends TransportFilter {
synchronized (commands) {
try {
boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
-
+
if (keep) {
// lets add it to the list for later on
+ if (log.isDebugEnabled()) {
+ log.debug("Received out of order command which is being buffered for later: " + command);
+ }
commands.add(command);
}
}
@@ -75,7 +119,7 @@ public class ReliableTransport extends TransportFilter {
// we've got a valid header so increment counter
replayStrategy.onReceivedPacket(this, expectedCounter);
expectedCounter++;
- getTransportListener().onCommand(command);
+ super.onCommand(command);
synchronized (commands) {
// we could have more commands left
@@ -98,13 +142,14 @@ public class ReliableTransport extends TransportFilter {
return commands.size();
}
}
-
+
public int getExpectedCounter() {
return expectedCounter;
}
/**
- * This property should never really be set - but is mutable primarily for test cases
+ * This property should never really be set - but is mutable primarily for
+ * test cases
*/
public void setExpectedCounter(int expectedCounter) {
this.expectedCounter = expectedCounter;
@@ -114,4 +159,10 @@ public class ReliableTransport extends TransportFilter {
return next.toString();
}
+
+ /**
+ * Lets attempt to replay the request as a command may have disappeared
+ */
+ protected void replayRequest(Command command, FutureResponse response) {
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
index fd45d6891f..04ae516067 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
@@ -16,11 +16,14 @@
*/
package org.apache.activemq.transport.udp;
+import edu.emory.mathcs.backport.java.util.concurrent.Future;
+
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import java.io.IOException;
import java.net.SocketAddress;
+import java.util.Map;
/**
*
@@ -30,9 +33,7 @@ public interface CommandChannel extends Service {
public abstract Command read() throws IOException;
- public abstract void write(Command command) throws IOException;
-
- public abstract void write(Command command, SocketAddress address) throws IOException;
+ public abstract void write(Command command, SocketAddress address, Map requestMap, Future future) throws IOException;
public abstract int getDatagramSize();
@@ -45,4 +46,6 @@ public interface CommandChannel extends Service {
public abstract void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller);
+ public abstract void setTargetAddress(SocketAddress address);
+
}
\ No newline at end of file
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
index 4dc02e11ca..1a467ce7f5 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
@@ -16,9 +16,10 @@
*/
package org.apache.activemq.transport.udp;
+import edu.emory.mathcs.backport.java.util.concurrent.Future;
+
import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream;
-import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
@@ -34,6 +35,7 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
+import java.util.Map;
/**
* A strategy for reading datagrams and de-fragmenting them together.
@@ -44,6 +46,7 @@ public class CommandDatagramChannel implements CommandChannel {
private static final Log log = LogFactory.getLog(CommandDatagramChannel.class);
+ private final UdpTransport transport;
private final String name;
private DatagramChannel channel;
private OpenWireFormat wireFormat;
@@ -61,15 +64,17 @@ public class CommandDatagramChannel implements CommandChannel {
private ByteBuffer writeBuffer;
private int defaultMarshalBufferSize = 64 * 1024;
- public CommandDatagramChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
+
+ public CommandDatagramChannel(UdpTransport transport, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
- this.name = name;
+ this.transport = transport;
this.channel = channel;
this.wireFormat = wireFormat;
this.bufferPool = bufferPool;
this.datagramSize = datagramSize;
this.targetAddress = targetAddress;
this.headerMarshaller = headerMarshaller;
+ this.name = transport.toString();
}
public String toString() {
@@ -87,34 +92,21 @@ public class CommandDatagramChannel implements CommandChannel {
bufferPool.stop();
}
- /* (non-Javadoc)
- * @see org.apache.activemq.transport.udp.CommandChannel#read()
- */
public Command read() throws IOException {
Command answer = null;
+ Endpoint from = null;
synchronized (readLock) {
while (true) {
readBuffer.clear();
SocketAddress address = channel.receive(readBuffer);
- /*
- if (address == null) {
- System.out.println("No address on packet: " + readBuffer);
- // continue;
- }
- */
-
readBuffer.flip();
if (readBuffer.limit() == 0) {
- //System.out.println("Empty packet!");
continue;
}
-
- //log.debug("buffer: " + readBuffer + " has remaining: " + readBuffer.remaining());
-
- Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
-
+ from = headerMarshaller.createEndpoint(readBuffer, address);
+
int remaining = readBuffer.remaining();
byte[] data = new byte[remaining];
readBuffer.get(data);
@@ -125,33 +117,25 @@ public class CommandDatagramChannel implements CommandChannel {
// buffering?
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
answer = (Command) wireFormat.unmarshal(dataIn);
- if (answer != null) {
- answer.setFrom(from);
- }
break;
}
}
if (answer != null) {
+ answer.setFrom(from);
+
if (log.isDebugEnabled()) {
- log.debug("Channel: " + name + " about to process: " + answer);
+ log.debug("Channel: " + name + " received from: " + from + " about to process: " + answer);
}
}
return answer;
}
- /* (non-Javadoc)
- * @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command)
- */
- public void write(Command command) throws IOException {
- write(command, targetAddress);
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command, java.net.SocketAddress)
- */
- public void write(Command command, SocketAddress address) throws IOException {
+ public void write(Command command, SocketAddress address, Map requestMap, Future future) throws IOException {
synchronized (writeLock) {
+ if (!command.isWireFormatInfo() && command.getCommandId() == 0) {
+ command.setCommandId(transport.getNextCommandId());
+ }
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
byte[] data = largeBuffer.toByteArray();
@@ -160,7 +144,7 @@ public class CommandDatagramChannel implements CommandChannel {
writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer);
- if (size >= writeBuffer.remaining()) {
+ if (size > writeBuffer.remaining()) {
// lets split the command up into chunks
int offset = 0;
boolean lastFragment = false;
@@ -216,7 +200,11 @@ public class CommandDatagramChannel implements CommandChannel {
bs.marshal(writeBuffer);
}
- writeBuffer.putInt(command.getCommandId());
+ int commandId = command.getCommandId();
+ if (fragment > 0) {
+ commandId = transport.getNextCommandId();
+ }
+ writeBuffer.putInt(commandId);
if (bs == null) {
writeBuffer.put((byte) 1);
}
@@ -232,7 +220,9 @@ public class CommandDatagramChannel implements CommandChannel {
}
// now lets write the last partial command
- command = new LastPartialCommand(command);
+ command = new LastPartialCommand(command.isResponseRequired());
+ command.setCommandId(transport.getNextCommandId());
+
largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
data = largeBuffer.toByteArray();
@@ -243,6 +233,9 @@ public class CommandDatagramChannel implements CommandChannel {
writeBuffer.put(data);
+ if (command.isResponseRequired()) {
+ requestMap.put(new Integer(command.getCommandId()), future);
+ }
sendWriteBuffer(address);
}
}
@@ -250,16 +243,10 @@ public class CommandDatagramChannel implements CommandChannel {
// Properties
// -------------------------------------------------------------------------
- /* (non-Javadoc)
- * @see org.apache.activemq.transport.udp.CommandChannel#getDatagramSize()
- */
public int getDatagramSize() {
return datagramSize;
}
- /* (non-Javadoc)
- * @see org.apache.activemq.transport.udp.CommandChannel#setDatagramSize(int)
- */
public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize;
}
@@ -275,20 +262,23 @@ public class CommandDatagramChannel implements CommandChannel {
this.bufferPool = bufferPool;
}
- /* (non-Javadoc)
- * @see org.apache.activemq.transport.udp.CommandChannel#getHeaderMarshaller()
- */
public DatagramHeaderMarshaller getHeaderMarshaller() {
return headerMarshaller;
}
- /* (non-Javadoc)
- * @see org.apache.activemq.transport.udp.CommandChannel#setHeaderMarshaller(org.apache.activemq.transport.udp.DatagramHeaderMarshaller)
- */
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
this.headerMarshaller = headerMarshaller;
}
+
+ public SocketAddress getTargetAddress() {
+ return targetAddress;
+ }
+
+ public void setTargetAddress(SocketAddress targetAddress) {
+ this.targetAddress = targetAddress;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
protected void sendWriteBuffer(SocketAddress address) throws IOException {
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
index 3735a637c6..ff7b78c687 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.transport.udp;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.Future;
+
import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.command.Command;
@@ -24,6 +27,8 @@ import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +40,7 @@ import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Map;
/**
* A strategy for reading datagrams and de-fragmenting them together.
@@ -45,6 +51,7 @@ public class CommandDatagramSocket implements CommandChannel {
private static final Log log = LogFactory.getLog(CommandDatagramSocket.class);
+ private final UdpTransport transport;
private final String name;
private DatagramSocket channel;
private InetAddress targetAddress;
@@ -59,16 +66,16 @@ public class CommandDatagramSocket implements CommandChannel {
// writing
private Object writeLock = new Object();
-
- public CommandDatagramSocket(String name, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress, int targetPort,
- DatagramHeaderMarshaller headerMarshaller) {
- this.name = name;
+ public CommandDatagramSocket(UdpTransport transport, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress,
+ int targetPort, DatagramHeaderMarshaller headerMarshaller) {
+ this.transport = transport;
this.channel = channel;
this.wireFormat = wireFormat;
this.datagramSize = datagramSize;
this.targetAddress = targetAddress;
this.targetPort = targetPort;
this.headerMarshaller = headerMarshaller;
+ this.name = transport.toString();
}
public String toString() {
@@ -100,7 +107,7 @@ public class CommandDatagramSocket implements CommandChannel {
}
if (answer != null) {
answer.setFrom(from);
-
+
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " about to process: " + answer);
}
@@ -108,23 +115,15 @@ public class CommandDatagramSocket implements CommandChannel {
return answer;
}
- public void write(Command command) throws IOException {
- write(command, targetAddress, targetPort);
+ public void write(Command command, SocketAddress address, Map requestMap, Future future) throws IOException {
+ InetSocketAddress ia = (InetSocketAddress) address;
+ write(command, ia.getAddress(), ia.getPort(), requestMap, future);
}
- public void write(Command command, SocketAddress address) throws IOException {
- if (address instanceof InetSocketAddress) {
- InetSocketAddress ia = (InetSocketAddress) address;
- write(command, ia.getAddress(), ia.getPort());
- }
- else {
- write(command);
- }
- }
-
- public void write(Command command, InetAddress address, int port) throws IOException {
+ public void write(Command command, InetAddress address, int port, Map requestMap, Future future) throws IOException {
synchronized (writeLock) {
+ command.setCommandId(transport.getNextCommandId());
ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(writeBuffer);
headerMarshaller.writeHeader(command, dataOut);
@@ -134,6 +133,9 @@ public class CommandDatagramSocket implements CommandChannel {
wireFormat.marshal(command, dataOut);
if (remaining(writeBuffer) >= 0) {
+ if (command.isResponseRequired()) {
+ requestMap.put(new Integer(command.getCommandId()), future);
+ }
sendWriteBuffer(address, port, writeBuffer);
}
else {
@@ -189,7 +191,11 @@ public class CommandDatagramSocket implements CommandChannel {
bs.marshal(dataOut);
}
- dataOut.writeInt(command.getCommandId());
+ int commandId = command.getCommandId();
+ if (fragment > 0) {
+ commandId = transport.getNextCommandId();
+ }
+ dataOut.writeInt(commandId);
if (bs == null) {
dataOut.write((byte) 1);
}
@@ -205,12 +211,16 @@ public class CommandDatagramSocket implements CommandChannel {
}
// now lets write the last partial command
- command = new LastPartialCommand(command);
+ command = new LastPartialCommand(command.isResponseRequired());
+ command.setCommandId(transport.getNextCommandId());
writeBuffer.reset();
headerMarshaller.writeHeader(command, dataOut);
wireFormat.marshal(command, dataOut);
+ if (command.isResponseRequired()) {
+ requestMap.put(new Integer(command.getCommandId()), future);
+ }
sendWriteBuffer(address, port, writeBuffer);
}
}
@@ -238,6 +248,22 @@ public class CommandDatagramSocket implements CommandChannel {
this.headerMarshaller = headerMarshaller;
}
+
+ public SocketAddress getTargetAddress() {
+ return new InetSocketAddress(targetAddress, targetPort);
+ }
+
+ public void setTargetAddress(SocketAddress address) {
+ if (address instanceof InetSocketAddress) {
+ InetSocketAddress ia = (InetSocketAddress) address;
+ targetAddress = ia.getAddress();
+ targetPort = ia.getPort();
+ }
+ else {
+ throw new IllegalArgumentException("Address must be instance of InetSocketAddress");
+ }
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer) throws IOException {
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
index 0f9db77ea2..25cc8d36e1 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
@@ -38,4 +38,5 @@ public class DatagramEndpoint extends BaseEndpoint {
return address;
}
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
index d19b5e2a98..ec0c468618 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
@@ -16,11 +16,17 @@
*/
package org.apache.activemq.transport.udp;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.command.Response;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReplayStrategy;
@@ -28,6 +34,7 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.EOFException;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
@@ -62,6 +69,10 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private int port;
private int minmumWireFormatVersion;
private String description = null;
+ private final ConcurrentHashMap requestMap = new ConcurrentHashMap();
+ private int lastCommandId = 0;
+
+ private Runnable runnable;
protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat;
@@ -89,6 +100,28 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.description = getProtocolName() + "Server@";
}
+ public TransportFilter createFilter(Transport transport) {
+ return new TransportFilter(transport) {
+ public void onCommand(Command command) {
+ boolean debug = log.isDebugEnabled();
+ if (command.isResponse()) {
+ Response response = (Response) command;
+ FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
+ if (future != null) {
+ future.set(response);
+ }
+ else {
+ if (debug)
+ log.debug("Received unexpected response for command id: " + response.getCorrelationId());
+ }
+ }
+ else {
+ super.onCommand(command);
+ }
+ }
+ };
+ }
+
/**
* A one way asynchronous send
*/
@@ -96,17 +129,53 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
oneway(command, targetAddress);
}
+ /**
+ * A one way asynchronous send
+ */
+ public void oneway(Command command, FutureResponse future) throws IOException {
+ oneway(command, targetAddress, future);
+ }
+
+ protected void oneway(Command command, SocketAddress address, FutureResponse future) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
+ }
+ checkStarted(command);
+ commandChannel.write(command, address, requestMap, future);
+ }
+
/**
* A one way asynchronous send to a given address
*/
public void oneway(Command command, SocketAddress address) throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("Sending oneway from: " + this + " to target: " + targetAddress);
- }
- checkStarted(command);
- commandChannel.write(command, address);
+ oneway(command, address, null);
}
-
+
+ public FutureResponse asyncRequest(Command command) throws IOException {
+ if (command.getCommandId() == 0) {
+ command.setCommandId(getNextCommandId());
+ }
+ command.setResponseRequired(true);
+ FutureResponse future = new FutureResponse();
+ oneway(command, future);
+ return future;
+ }
+
+ public Response request(Command command) throws IOException {
+ FutureResponse response = asyncRequest(command);
+ return response.getResult();
+ }
+
+ public Response request(Command command, int timeout) throws IOException {
+ FutureResponse response = asyncRequest(command);
+ return response.getResult(timeout);
+ }
+
+
+ public void setStartupRunnable(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
/**
* @return pretty print of 'this'
*/
@@ -124,6 +193,9 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
*/
public void run() {
log.trace("Consumer thread starting for: " + toString());
+ if (runnable != null) {
+ runnable.run();
+ }
while (!isStopped()) {
try {
Command command = commandChannel.read();
@@ -135,7 +207,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
stop();
}
catch (Exception e2) {
- log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
+ log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
}
}
catch (SocketException e) {
@@ -145,23 +217,32 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
stop();
}
catch (Exception e2) {
- log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
+ log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
}
}
- catch (Exception e) {
- System.out.println("Caught exception of type: " + e.getClass());
- e.printStackTrace();
+ catch (EOFException e) {
+ // DataInputStream closed
+ log.debug("Socket closed: " + e, e);
try {
stop();
}
catch (Exception e2) {
- log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
+ log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
+ }
+ }
+ catch (Exception e) {
+ try {
+ stop();
+ }
+ catch (Exception e2) {
+ log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
}
if (e instanceof IOException) {
onException((IOException) e);
}
else {
- log.error(e);
+ log.error("Caught: " + e, e);
+ e.printStackTrace();
}
}
}
@@ -182,6 +263,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
originalTargetAddress = targetAddress;
}
targetAddress = address;
+ commandChannel.setTargetAddress(address);
}
}
}
@@ -324,12 +406,12 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
if (bufferPool == null) {
bufferPool = new DefaultBufferPool();
}
- return new CommandDatagramChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
+ return new CommandDatagramChannel(this, channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
}
protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {
channel.configureBlocking(true);
-
+
if (log.isDebugEnabled()) {
log.debug("Binding to address: " + localAddress);
}
@@ -340,7 +422,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
// TODO
// connect to default target address to avoid security checks each time
// channel = channel.connect(targetAddress);
-
+
return channel;
}
@@ -370,7 +452,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return targetAddress;
}
- public void setCommandChannel(CommandChannel commandChannel) {
- this.commandChannel = commandChannel;
+ protected synchronized int getNextCommandId() {
+ return ++lastCommandId;
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
index cfa9d799f8..b0889beffb 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
@@ -17,6 +17,8 @@
package org.apache.activemq.transport.udp;
import org.activeio.command.WireFormat;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner;
@@ -24,9 +26,13 @@ import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
+import org.apache.activemq.transport.reliable.ReliableTransport;
+import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@@ -85,6 +91,9 @@ public class UdpTransportFactory extends TransportFactory {
if (udpTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
}
+
+ // TODO should we have this?
+ //transport = udpTransport.createFilter(transport);
return transport;
}
@@ -96,6 +105,7 @@ public class UdpTransportFactory extends TransportFactory {
protected Transport configure(Transport transport, WireFormat format, Map options, boolean server) {
IntrospectionSupport.setProperties(transport, options);
UdpTransport udpTransport = (UdpTransport) transport;
+ OpenWireFormat openWireFormat = asOpenWireFormat(format);
if (udpTransport.isTrace()) {
transport = new TransportLogger(transport);
@@ -109,24 +119,46 @@ public class UdpTransportFactory extends TransportFactory {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
}
- transport = new ResponseCorrelator(transport);
-
+ // add reliabilty
+ //transport = new ReliableTransport(transport, createReplayStrategy());
+
// deal with fragmentation
- transport = new CommandJoiner(transport, asOpenWireFormat(format));
+ transport = new CommandJoiner(transport, openWireFormat);
+
+ transport = udpTransport.createFilter(transport);
return transport;
}
+ protected ReplayStrategy createReplayStrategy() {
+ return new ExceptionIfDroppedReplayStrategy(1);
+ }
+
protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
+ return new TransportFilter(transport) {
+
+ public void onCommand(Command command) {
+ // redirect to the endpoint that the last response came from
+ Endpoint from = command.getFrom();
+ udpTransport.setTargetEndpoint(from);
+
+ super.onCommand(command);
+ }
+
+ };
+ /*
transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {
protected void onWireFormatNegotiated(WireFormatInfo info) {
// lets switch to the target endpoint
// based on the last packet that was received
// so that all future requests go to the newly created UDP channel
- udpTransport.setTargetEndpoint(info.getFrom());
+ Endpoint from = info.getFrom();
+ System.out.println("####Ęsetting the client side target to: " + from);
+ udpTransport.setTargetEndpoint(from);
}
};
return transport;
+ */
}
protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
index ca96f4886f..999cb1125f 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
@@ -24,6 +24,7 @@ import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerSupport;
@@ -49,13 +50,13 @@ public class UdpTransportServer extends TransportServerSupport {
private UdpTransport serverTransport;
private Transport configuredTransport;
+ private boolean usingWireFormatNegotiation;
private Map transports = new HashMap();
public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport) {
super(connectURI);
this.serverTransport = serverTransport;
-
-
+
this.configuredTransport = configuredTransport;
// lets disable the incremental checking of the sequence numbers
@@ -86,6 +87,7 @@ public class UdpTransportServer extends TransportServerSupport {
}
public void onException(IOException error) {
+ log.error("Caught: " + error, error);
}
public void transportInterupted() {
@@ -110,16 +112,22 @@ public class UdpTransportServer extends TransportServerSupport {
synchronized (transports) {
transport = (Transport) transports.get(endpoint);
if (transport == null) {
- if (log.isDebugEnabled()) {
- log.debug("Creating a new UDP server connection");
+ if (usingWireFormatNegotiation && !command.isWireFormatInfo()) {
+ log.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command);
}
- try {
- transport = createTransport(command, endpoint);
- transport = configureTransport(transport);
- transports.put(endpoint, transport);
- }
- catch (IOException e) {
- getAcceptListener().onAcceptError(e);
+ else {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a new UDP server connection");
+ }
+ try {
+ transport = createTransport(command, endpoint);
+ transport = configureTransport(transport);
+ transports.put(endpoint, transport);
+ }
+ catch (IOException e) {
+ log.error("Caught: " + e, e);
+ getAcceptListener().onAcceptError(e);
+ }
}
}
else {
@@ -129,7 +137,7 @@ public class UdpTransportServer extends TransportServerSupport {
}
protected Transport configureTransport(Transport transport) {
- transport = new ResponseCorrelator(transport);
+ // transport = new ResponseCorrelator(transport);
if (serverTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
@@ -140,26 +148,54 @@ public class UdpTransportServer extends TransportServerSupport {
}
protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException {
+ if (endpoint == null) {
+ //log.error("No endpoint available for command: " + command);
+ throw new IOException("No endpoint available for command: " + command);
+ }
final SocketAddress address = endpoint.getAddress();
final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
Transport configuredTransport = new CommandJoiner(transport, connectionWireFormat);
-
- return new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) {
+
+ // lets pass in the received transport
+ return new TransportFilter(configuredTransport) {
+ public void start() throws Exception {
+ super.start();
+ onCommand(command);
+ }
+ };
+
+ /**
+ // return configuredTransport;
+
+ // configuredTransport = transport.createFilter(configuredTransport);
+
+ final WireFormatNegotiator wireFormatNegotiator = new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport
+ .getMinmumWireFormatVersion()) {
public void start() throws Exception {
super.start();
-
- // process the inbound wireformat
+ System.out.println("Starting a new server transport: " + this + " with command: " + command);
onCommand(command);
}
// lets use the specific addressing of wire format
protected void sendWireFormat(WireFormatInfo info) throws IOException {
+ System.out.println("#### we have negotiated the wireformat; sending a wireformat to: " + address);
transport.oneway(info, address);
}
};
+ return wireFormatNegotiator;
+ */
+
+ /*
+ * transport.setStartupRunnable(new Runnable() {
+ *
+ * public void run() { System.out.println("Passing the incoming
+ * WireFormat into into: " + this);
+ * // process the inbound wireformat
+ * wireFormatNegotiator.onCommand(command); }});
+ */
}
-
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java
index 4b3952e26f..8fd69bd1cd 100755
--- a/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java
@@ -165,7 +165,7 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
* Waits for the messages to be delivered or when the wait time has been reached.
*/
protected void waitForMessagesToBeDelivered() {
- long maxWaitTime = 30000;
+ long maxWaitTime = 60000;
long waitTime = maxWaitTime;
long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
diff --git a/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java b/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java
index e31704ba5a..b495788820 100755
--- a/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java
@@ -44,10 +44,14 @@ public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTes
connectionFactory = createConnectionFactory();
+ log.info("Creating send connection");
sendConnection = createSendConnection();
+ log.info("Starting send connection");
sendConnection.start();
+ log.info("Creating receive connection");
receiveConnection = createReceiveConnection();
+ log.info("Starting receive connection");
receiveConnection.start();
log.info("Created sendConnection: " + sendConnection);
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest.java
new file mode 100644
index 0000000000..1efdf252eb
--- /dev/null
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest.java
@@ -0,0 +1,31 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest extends UdpSendReceiveWithTwoConnectionsTest {
+
+ protected void setUp() throws Exception {
+ largeMessages = true;
+ //messageCount = 1;
+ super.setUp();
+ }
+
+}
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
index a8fd6f42f3..55bdb321f6 100755
--- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
@@ -29,7 +29,6 @@ public class UdpSendReceiveWithTwoConnectionsTest extends JmsTopicSendReceiveWit
protected BrokerService broker;
protected void setUp() throws Exception {
- largeMessages = true;
broker = createBroker();
broker.start();
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
index 1248392dcc..036863c55c 100644
--- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
@@ -21,6 +21,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
@@ -189,6 +190,11 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
System.out.println("Got WireFormatInfo: " + command);
}
else {
+ if (command.isResponseRequired()) {
+ // lets send a response back...
+ sendResponse(command);
+
+ }
if (large) {
System.out.println("### Received command: " + command.getClass() + " with id: " + command.getCommandId());
}
@@ -203,6 +209,19 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
}
}
+ protected void sendResponse(Command command) {
+ Response response = new Response();
+ response.setCorrelationId(command.getCommandId());
+ try {
+ consumer.oneway(response);
+ }
+ catch (IOException e) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
public void onException(IOException error) {
System.out.println("### Received error: " + error);
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
index 9fc9184b48..cfdb33edbb 100644
--- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.transport.udp;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
@@ -32,6 +34,18 @@ public class UdpTransportUsingServerTest extends UdpTestSupport {
protected String producerURI = "udp://localhost:" + consumerPort;
protected String serverURI = producerURI;
+ public void testRequestResponse() throws Exception {
+ ConsumerInfo expected = new ConsumerInfo();
+ expected.setSelector("Edam");
+ expected.setResponseRequired(true);
+ System.out.println("About to send: " + expected);
+ Response response = producer.request(expected, 2000);
+
+ System.out.println("Received: " + response);
+ assertNotNull("Received a response", response);
+ assertTrue("Should not be an exception", !response.isException());
+ }
+
protected Transport createProducer() throws Exception {
System.out.println("Producer using URI: " + producerURI);
URI uri = new URI(producerURI);
@@ -41,7 +55,7 @@ public class UdpTransportUsingServerTest extends UdpTestSupport {
protected TransportServer createServer() throws Exception {
return TransportFactory.bind("byBroker", new URI(serverURI));
}
-
+
protected Transport createConsumer() throws Exception {
return null;
}