fixed the UDP and multicast test cases by merging the ResponseCorrelator into the UDP transport implementation

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@385794 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-14 12:31:44 +00:00
parent daf48bb297
commit 72b382d0bd
23 changed files with 471 additions and 144 deletions

View File

@ -360,10 +360,6 @@
<exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude>
<!-- TODO FIX ME -->
<exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude>
<exclude>**/MulticastTransportTest.*</exclude>
</excludes> </excludes>
</unitTest> </unitTest>
<resources> <resources>

View File

@ -33,4 +33,9 @@ public class BaseEndpoint implements Endpoint {
return name; return name;
} }
public String toString() {
return "Endpoint[" + name + "]";
}
} }

View File

@ -31,9 +31,8 @@ public class LastPartialCommand extends BaseCommand {
public LastPartialCommand() { public LastPartialCommand() {
} }
public LastPartialCommand(Command command) { public LastPartialCommand(boolean responseRequired) {
setCommandId(command.getCommandId()); setResponseRequired(responseRequired);
setResponseRequired(command.isResponseRequired());
} }
public byte getDataStructureType() { public byte getDataStructureType() {
@ -44,4 +43,22 @@ public class LastPartialCommand extends BaseCommand {
throw new IllegalStateException("The transport layer should filter out LastPartialCommand instances but received: " + this); throw new IllegalStateException("The transport layer should filter out LastPartialCommand instances but received: " + this);
} }
/**
* Lets copy across the required fields from this last partial command to
* the newly unmarshalled complete command
*
* @param completeCommand the newly unmarshalled complete command
*/
public void configure(Command completeCommand) {
// overwrite the commandId as the numbers change when we introduce
// fragmentation commands
completeCommand.setCommandId(getCommandId());
// copy across the transient properties
completeCommand.setFrom(getFrom());
// TODO should not be required as the large command would be marshalled with this property
//completeCommand.setResponseRequired(isResponseRequired());
}
} }

View File

@ -128,4 +128,10 @@ public class PartialCommand implements Command {
public boolean isMarshallAware() { public boolean isMarshallAware() {
return false; return false;
} }
public String toString() {
return "PartialCommand[id: " + commandId + " data: " + data.length + " byte(s)]";
}
} }

View File

@ -21,6 +21,8 @@ import org.apache.activemq.command.Command;
import org.apache.activemq.command.LastPartialCommand; import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand; import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -33,6 +35,7 @@ import java.io.IOException;
* @version $Revision$ * @version $Revision$
*/ */
public class CommandJoiner extends TransportFilter { public class CommandJoiner extends TransportFilter {
private static final Log log = LogFactory.getLog(CommandJoiner.class);
private ByteArrayOutputStream out = new ByteArrayOutputStream(); private ByteArrayOutputStream out = new ByteArrayOutputStream();
private OpenWireFormat wireFormat; private OpenWireFormat wireFormat;
@ -57,13 +60,16 @@ public class CommandJoiner extends TransportFilter {
else if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) { else if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
try { try {
byte[] fullData = out.toByteArray(); byte[] fullData = out.toByteArray();
out.reset();
Command completeCommand = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(fullData))); Command completeCommand = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(fullData)));
completeCommand.setCommandId(command.getCommandId());
completeCommand.setResponseRequired(command.isResponseRequired()); LastPartialCommand lastCommand = (LastPartialCommand) command;
resetBuffer(); lastCommand.configure(completeCommand);
getTransportListener().onCommand(completeCommand); getTransportListener().onCommand(completeCommand);
} }
catch (IOException e) { catch (IOException e) {
log.warn("Failed to unmarshal partial command: " + command);
getTransportListener().onException(e); getTransportListener().onException(e);
} }
} }
@ -74,14 +80,10 @@ public class CommandJoiner extends TransportFilter {
public void stop() throws Exception { public void stop() throws Exception {
super.stop(); super.stop();
resetBuffer(); out = null;
} }
public String toString() { public String toString() {
return next.toString(); return next.toString();
} }
protected void resetBuffer() {
out.reset();
}
} }

View File

@ -32,14 +32,14 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
* *
* @version $Revision: 1.4 $ * @version $Revision: 1.4 $
*/ */
final public class ResponseCorrelator extends TransportFilter { public class ResponseCorrelator extends TransportFilter {
private static final Log log = LogFactory.getLog(ResponseCorrelator.class); private static final Log log = LogFactory.getLog(ResponseCorrelator.class);
private final ConcurrentHashMap requestMap = new ConcurrentHashMap(); private final ConcurrentHashMap requestMap = new ConcurrentHashMap();
private int lastCommandId = 0; private int lastCommandId = 0;
synchronized int getNextCommandId() { public synchronized int getNextCommandId() {
return ++lastCommandId; return ++lastCommandId;
} }
@ -48,13 +48,19 @@ final public class ResponseCorrelator extends TransportFilter {
} }
public void oneway(Command command) throws IOException { public void oneway(Command command) throws IOException {
command.setCommandId(getNextCommandId()); // a parent transport could have set the ID
if (command.getCommandId() == 0) {
command.setCommandId(getNextCommandId());
}
command.setResponseRequired(false); command.setResponseRequired(false);
next.oneway(command); next.oneway(command);
} }
public FutureResponse asyncRequest(Command command) throws IOException { public FutureResponse asyncRequest(Command command) throws IOException {
command.setCommandId(getNextCommandId()); // a parent transport could have set the ID
if (command.getCommandId() == 0) {
command.setCommandId(getNextCommandId());
}
command.setResponseRequired(true); command.setResponseRequired(true);
FutureResponse future = new FutureResponse(); FutureResponse future = new FutureResponse();
requestMap.put(new Integer(command.getCommandId()), future); requestMap.put(new Integer(command.getCommandId()), future);

View File

@ -88,15 +88,15 @@ public class WireFormatNegotiator extends TransportFilter {
try { try {
wireInfoSentDownLatch.await(); wireInfoSentDownLatch.await();
if (log.isDebugEnabled()) {
log.debug(this + " before negotiation: " + wireFormat);
}
if( !info.isValid() ) { if( !info.isValid() ) {
onException(new IOException("Remote wire format magic is invalid")); onException(new IOException("Remote wire format magic is invalid"));
} else if( info.getVersion() < minimumVersion ) { } else if( info.getVersion() < minimumVersion ) {
onException(new IOException("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")")); onException(new IOException("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")"));
} }
if (log.isDebugEnabled()) {
log.debug(this + " before negotiation: " + wireFormat);
}
wireFormat.renegociatWireFormat(info); wireFormat.renegociatWireFormat(info);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {

View File

@ -93,7 +93,7 @@ public class MulticastTransport extends UdpTransport {
socket.joinGroup(mcastAddress); socket.joinGroup(mcastAddress);
socket.setSoTimeout((int) keepAliveInterval); socket.setSoTimeout((int) keepAliveInterval);
return new CommandDatagramSocket(toString(), socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller()); return new CommandDatagramSocket(this, socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller());
} }
protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException { protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {

View File

@ -28,13 +28,20 @@ public class ExceptionIfDroppedReplayStrategy implements ReplayStrategy {
private int maximumDifference = 5; private int maximumDifference = 5;
public ExceptionIfDroppedReplayStrategy() {
}
public ExceptionIfDroppedReplayStrategy(int maximumDifference) {
this.maximumDifference = maximumDifference;
}
public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException { public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
int difference = actualCounter - expectedCounter; int difference = actualCounter - expectedCounter;
long count = Math.abs(difference); long count = Math.abs(difference);
if (count > maximumDifference) { if (count > maximumDifference) {
throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter); throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter);
} }
// lets discard old commands // lets discard old commands
return difference > 0; return difference > 0;
} }

View File

@ -17,9 +17,13 @@
package org.apache.activemq.transport.reliable; package org.apache.activemq.transport.reliable;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.openwire.CommandIdComparator; import org.apache.activemq.openwire.CommandIdComparator;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException; import java.io.IOException;
import java.util.SortedSet; import java.util.SortedSet;
@ -31,17 +35,54 @@ import java.util.TreeSet;
* *
* @version $Revision$ * @version $Revision$
*/ */
public class ReliableTransport extends TransportFilter { public class ReliableTransport extends ResponseCorrelator {
private static final Log log = LogFactory.getLog(ReliableTransport.class);
private ReplayStrategy replayStrategy; private ReplayStrategy replayStrategy;
private SortedSet commands = new TreeSet(new CommandIdComparator()); private SortedSet commands = new TreeSet(new CommandIdComparator());
private int expectedCounter = 1; private int expectedCounter = 1;
private int requestTimeout = 2000;
public ReliableTransport(Transport next, ReplayStrategy replayStrategy) { public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
super(next); super(next);
this.replayStrategy = replayStrategy; this.replayStrategy = replayStrategy;
} }
public Response request(Command command) throws IOException {
FutureResponse response = asyncRequest(command);
while (true) {
Response result = response.getResult(requestTimeout);
if (result != null) {
return result;
}
replayRequest(command, response);
}
}
public Response request(Command command, int timeout) throws IOException {
FutureResponse response = asyncRequest(command);
while (timeout > 0) {
int time = timeout;
if (timeout > requestTimeout) {
time = requestTimeout;
}
Response result = response.getResult(time);
if (result != null) {
return result;
}
replayRequest(command, response);
timeout -= time;
}
return response.getResult(0);
}
public void onCommand(Command command) { public void onCommand(Command command) {
// lets pass wireformat through
if (command.isWireFormatInfo()) {
super.onCommand(command);
return;
}
int actualCounter = command.getCommandId(); int actualCounter = command.getCommandId();
boolean valid = expectedCounter == actualCounter; boolean valid = expectedCounter == actualCounter;
@ -49,9 +90,12 @@ public class ReliableTransport extends TransportFilter {
synchronized (commands) { synchronized (commands) {
try { try {
boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter); boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
if (keep) { if (keep) {
// lets add it to the list for later on // lets add it to the list for later on
if (log.isDebugEnabled()) {
log.debug("Received out of order command which is being buffered for later: " + command);
}
commands.add(command); commands.add(command);
} }
} }
@ -75,7 +119,7 @@ public class ReliableTransport extends TransportFilter {
// we've got a valid header so increment counter // we've got a valid header so increment counter
replayStrategy.onReceivedPacket(this, expectedCounter); replayStrategy.onReceivedPacket(this, expectedCounter);
expectedCounter++; expectedCounter++;
getTransportListener().onCommand(command); super.onCommand(command);
synchronized (commands) { synchronized (commands) {
// we could have more commands left // we could have more commands left
@ -98,13 +142,14 @@ public class ReliableTransport extends TransportFilter {
return commands.size(); return commands.size();
} }
} }
public int getExpectedCounter() { public int getExpectedCounter() {
return expectedCounter; return expectedCounter;
} }
/** /**
* This property should never really be set - but is mutable primarily for test cases * This property should never really be set - but is mutable primarily for
* test cases
*/ */
public void setExpectedCounter(int expectedCounter) { public void setExpectedCounter(int expectedCounter) {
this.expectedCounter = expectedCounter; this.expectedCounter = expectedCounter;
@ -114,4 +159,10 @@ public class ReliableTransport extends TransportFilter {
return next.toString(); return next.toString();
} }
/**
* Lets attempt to replay the request as a command may have disappeared
*/
protected void replayRequest(Command command, FutureResponse response) {
}
} }

View File

@ -16,11 +16,14 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import edu.emory.mathcs.backport.java.util.concurrent.Future;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Map;
/** /**
* *
@ -30,9 +33,7 @@ public interface CommandChannel extends Service {
public abstract Command read() throws IOException; public abstract Command read() throws IOException;
public abstract void write(Command command) throws IOException; public abstract void write(Command command, SocketAddress address, Map requestMap, Future future) throws IOException;
public abstract void write(Command command, SocketAddress address) throws IOException;
public abstract int getDatagramSize(); public abstract int getDatagramSize();
@ -45,4 +46,6 @@ public interface CommandChannel extends Service {
public abstract void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller); public abstract void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller);
public abstract void setTargetAddress(SocketAddress address);
} }

View File

@ -16,9 +16,10 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import edu.emory.mathcs.backport.java.util.concurrent.Future;
import org.activeio.ByteArrayInputStream; import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream; import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand; import org.apache.activemq.command.LastPartialCommand;
@ -34,6 +35,7 @@ import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.util.Map;
/** /**
* A strategy for reading datagrams and de-fragmenting them together. * A strategy for reading datagrams and de-fragmenting them together.
@ -44,6 +46,7 @@ public class CommandDatagramChannel implements CommandChannel {
private static final Log log = LogFactory.getLog(CommandDatagramChannel.class); private static final Log log = LogFactory.getLog(CommandDatagramChannel.class);
private final UdpTransport transport;
private final String name; private final String name;
private DatagramChannel channel; private DatagramChannel channel;
private OpenWireFormat wireFormat; private OpenWireFormat wireFormat;
@ -61,15 +64,17 @@ public class CommandDatagramChannel implements CommandChannel {
private ByteBuffer writeBuffer; private ByteBuffer writeBuffer;
private int defaultMarshalBufferSize = 64 * 1024; private int defaultMarshalBufferSize = 64 * 1024;
public CommandDatagramChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
public CommandDatagramChannel(UdpTransport transport, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) { SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
this.name = name; this.transport = transport;
this.channel = channel; this.channel = channel;
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.datagramSize = datagramSize; this.datagramSize = datagramSize;
this.targetAddress = targetAddress; this.targetAddress = targetAddress;
this.headerMarshaller = headerMarshaller; this.headerMarshaller = headerMarshaller;
this.name = transport.toString();
} }
public String toString() { public String toString() {
@ -87,34 +92,21 @@ public class CommandDatagramChannel implements CommandChannel {
bufferPool.stop(); bufferPool.stop();
} }
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#read()
*/
public Command read() throws IOException { public Command read() throws IOException {
Command answer = null; Command answer = null;
Endpoint from = null;
synchronized (readLock) { synchronized (readLock) {
while (true) { while (true) {
readBuffer.clear(); readBuffer.clear();
SocketAddress address = channel.receive(readBuffer); SocketAddress address = channel.receive(readBuffer);
/*
if (address == null) {
System.out.println("No address on packet: " + readBuffer);
// continue;
}
*/
readBuffer.flip(); readBuffer.flip();
if (readBuffer.limit() == 0) { if (readBuffer.limit() == 0) {
//System.out.println("Empty packet!");
continue; continue;
} }
from = headerMarshaller.createEndpoint(readBuffer, address);
//log.debug("buffer: " + readBuffer + " has remaining: " + readBuffer.remaining());
Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
int remaining = readBuffer.remaining(); int remaining = readBuffer.remaining();
byte[] data = new byte[remaining]; byte[] data = new byte[remaining];
readBuffer.get(data); readBuffer.get(data);
@ -125,33 +117,25 @@ public class CommandDatagramChannel implements CommandChannel {
// buffering? // buffering?
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
answer = (Command) wireFormat.unmarshal(dataIn); answer = (Command) wireFormat.unmarshal(dataIn);
if (answer != null) {
answer.setFrom(from);
}
break; break;
} }
} }
if (answer != null) { if (answer != null) {
answer.setFrom(from);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " about to process: " + answer); log.debug("Channel: " + name + " received from: " + from + " about to process: " + answer);
} }
} }
return answer; return answer;
} }
/* (non-Javadoc) public void write(Command command, SocketAddress address, Map requestMap, Future future) throws IOException {
* @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command)
*/
public void write(Command command) throws IOException {
write(command, targetAddress);
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command, java.net.SocketAddress)
*/
public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) { synchronized (writeLock) {
if (!command.isWireFormatInfo() && command.getCommandId() == 0) {
command.setCommandId(transport.getNextCommandId());
}
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer)); wireFormat.marshal(command, new DataOutputStream(largeBuffer));
byte[] data = largeBuffer.toByteArray(); byte[] data = largeBuffer.toByteArray();
@ -160,7 +144,7 @@ public class CommandDatagramChannel implements CommandChannel {
writeBuffer.clear(); writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer); headerMarshaller.writeHeader(command, writeBuffer);
if (size >= writeBuffer.remaining()) { if (size > writeBuffer.remaining()) {
// lets split the command up into chunks // lets split the command up into chunks
int offset = 0; int offset = 0;
boolean lastFragment = false; boolean lastFragment = false;
@ -216,7 +200,11 @@ public class CommandDatagramChannel implements CommandChannel {
bs.marshal(writeBuffer); bs.marshal(writeBuffer);
} }
writeBuffer.putInt(command.getCommandId()); int commandId = command.getCommandId();
if (fragment > 0) {
commandId = transport.getNextCommandId();
}
writeBuffer.putInt(commandId);
if (bs == null) { if (bs == null) {
writeBuffer.put((byte) 1); writeBuffer.put((byte) 1);
} }
@ -232,7 +220,9 @@ public class CommandDatagramChannel implements CommandChannel {
} }
// now lets write the last partial command // now lets write the last partial command
command = new LastPartialCommand(command); command = new LastPartialCommand(command.isResponseRequired());
command.setCommandId(transport.getNextCommandId());
largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer)); wireFormat.marshal(command, new DataOutputStream(largeBuffer));
data = largeBuffer.toByteArray(); data = largeBuffer.toByteArray();
@ -243,6 +233,9 @@ public class CommandDatagramChannel implements CommandChannel {
writeBuffer.put(data); writeBuffer.put(data);
if (command.isResponseRequired()) {
requestMap.put(new Integer(command.getCommandId()), future);
}
sendWriteBuffer(address); sendWriteBuffer(address);
} }
} }
@ -250,16 +243,10 @@ public class CommandDatagramChannel implements CommandChannel {
// Properties // Properties
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#getDatagramSize()
*/
public int getDatagramSize() { public int getDatagramSize() {
return datagramSize; return datagramSize;
} }
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#setDatagramSize(int)
*/
public void setDatagramSize(int datagramSize) { public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize; this.datagramSize = datagramSize;
} }
@ -275,20 +262,23 @@ public class CommandDatagramChannel implements CommandChannel {
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
} }
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#getHeaderMarshaller()
*/
public DatagramHeaderMarshaller getHeaderMarshaller() { public DatagramHeaderMarshaller getHeaderMarshaller() {
return headerMarshaller; return headerMarshaller;
} }
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#setHeaderMarshaller(org.apache.activemq.transport.udp.DatagramHeaderMarshaller)
*/
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) { public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
this.headerMarshaller = headerMarshaller; this.headerMarshaller = headerMarshaller;
} }
public SocketAddress getTargetAddress() {
return targetAddress;
}
public void setTargetAddress(SocketAddress targetAddress) {
this.targetAddress = targetAddress;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected void sendWriteBuffer(SocketAddress address) throws IOException { protected void sendWriteBuffer(SocketAddress address) throws IOException {

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.Future;
import org.activeio.ByteArrayInputStream; import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream; import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
@ -24,6 +27,8 @@ import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand; import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -35,6 +40,7 @@ import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Map;
/** /**
* A strategy for reading datagrams and de-fragmenting them together. * A strategy for reading datagrams and de-fragmenting them together.
@ -45,6 +51,7 @@ public class CommandDatagramSocket implements CommandChannel {
private static final Log log = LogFactory.getLog(CommandDatagramSocket.class); private static final Log log = LogFactory.getLog(CommandDatagramSocket.class);
private final UdpTransport transport;
private final String name; private final String name;
private DatagramSocket channel; private DatagramSocket channel;
private InetAddress targetAddress; private InetAddress targetAddress;
@ -59,16 +66,16 @@ public class CommandDatagramSocket implements CommandChannel {
// writing // writing
private Object writeLock = new Object(); private Object writeLock = new Object();
public CommandDatagramSocket(UdpTransport transport, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress,
public CommandDatagramSocket(String name, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress, int targetPort, int targetPort, DatagramHeaderMarshaller headerMarshaller) {
DatagramHeaderMarshaller headerMarshaller) { this.transport = transport;
this.name = name;
this.channel = channel; this.channel = channel;
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.datagramSize = datagramSize; this.datagramSize = datagramSize;
this.targetAddress = targetAddress; this.targetAddress = targetAddress;
this.targetPort = targetPort; this.targetPort = targetPort;
this.headerMarshaller = headerMarshaller; this.headerMarshaller = headerMarshaller;
this.name = transport.toString();
} }
public String toString() { public String toString() {
@ -100,7 +107,7 @@ public class CommandDatagramSocket implements CommandChannel {
} }
if (answer != null) { if (answer != null) {
answer.setFrom(from); answer.setFrom(from);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " about to process: " + answer); log.debug("Channel: " + name + " about to process: " + answer);
} }
@ -108,23 +115,15 @@ public class CommandDatagramSocket implements CommandChannel {
return answer; return answer;
} }
public void write(Command command) throws IOException { public void write(Command command, SocketAddress address, Map requestMap, Future future) throws IOException {
write(command, targetAddress, targetPort); InetSocketAddress ia = (InetSocketAddress) address;
write(command, ia.getAddress(), ia.getPort(), requestMap, future);
} }
public void write(Command command, SocketAddress address) throws IOException { public void write(Command command, InetAddress address, int port, Map requestMap, Future future) throws IOException {
if (address instanceof InetSocketAddress) {
InetSocketAddress ia = (InetSocketAddress) address;
write(command, ia.getAddress(), ia.getPort());
}
else {
write(command);
}
}
public void write(Command command, InetAddress address, int port) throws IOException {
synchronized (writeLock) { synchronized (writeLock) {
command.setCommandId(transport.getNextCommandId());
ByteArrayOutputStream writeBuffer = createByteArrayOutputStream(); ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(writeBuffer); DataOutputStream dataOut = new DataOutputStream(writeBuffer);
headerMarshaller.writeHeader(command, dataOut); headerMarshaller.writeHeader(command, dataOut);
@ -134,6 +133,9 @@ public class CommandDatagramSocket implements CommandChannel {
wireFormat.marshal(command, dataOut); wireFormat.marshal(command, dataOut);
if (remaining(writeBuffer) >= 0) { if (remaining(writeBuffer) >= 0) {
if (command.isResponseRequired()) {
requestMap.put(new Integer(command.getCommandId()), future);
}
sendWriteBuffer(address, port, writeBuffer); sendWriteBuffer(address, port, writeBuffer);
} }
else { else {
@ -189,7 +191,11 @@ public class CommandDatagramSocket implements CommandChannel {
bs.marshal(dataOut); bs.marshal(dataOut);
} }
dataOut.writeInt(command.getCommandId()); int commandId = command.getCommandId();
if (fragment > 0) {
commandId = transport.getNextCommandId();
}
dataOut.writeInt(commandId);
if (bs == null) { if (bs == null) {
dataOut.write((byte) 1); dataOut.write((byte) 1);
} }
@ -205,12 +211,16 @@ public class CommandDatagramSocket implements CommandChannel {
} }
// now lets write the last partial command // now lets write the last partial command
command = new LastPartialCommand(command); command = new LastPartialCommand(command.isResponseRequired());
command.setCommandId(transport.getNextCommandId());
writeBuffer.reset(); writeBuffer.reset();
headerMarshaller.writeHeader(command, dataOut); headerMarshaller.writeHeader(command, dataOut);
wireFormat.marshal(command, dataOut); wireFormat.marshal(command, dataOut);
if (command.isResponseRequired()) {
requestMap.put(new Integer(command.getCommandId()), future);
}
sendWriteBuffer(address, port, writeBuffer); sendWriteBuffer(address, port, writeBuffer);
} }
} }
@ -238,6 +248,22 @@ public class CommandDatagramSocket implements CommandChannel {
this.headerMarshaller = headerMarshaller; this.headerMarshaller = headerMarshaller;
} }
public SocketAddress getTargetAddress() {
return new InetSocketAddress(targetAddress, targetPort);
}
public void setTargetAddress(SocketAddress address) {
if (address instanceof InetSocketAddress) {
InetSocketAddress ia = (InetSocketAddress) address;
targetAddress = ia.getAddress();
targetPort = ia.getPort();
}
else {
throw new IllegalArgumentException("Address must be instance of InetSocketAddress");
}
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer) throws IOException { protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer) throws IOException {

View File

@ -38,4 +38,5 @@ public class DatagramEndpoint extends BaseEndpoint {
return address; return address;
} }
} }

View File

@ -16,11 +16,17 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.Response;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportThreadSupport; import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReplayStrategy; import org.apache.activemq.transport.reliable.ReplayStrategy;
@ -28,6 +34,7 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
@ -62,6 +69,10 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private int port; private int port;
private int minmumWireFormatVersion; private int minmumWireFormatVersion;
private String description = null; private String description = null;
private final ConcurrentHashMap requestMap = new ConcurrentHashMap();
private int lastCommandId = 0;
private Runnable runnable;
protected UdpTransport(OpenWireFormat wireFormat) throws IOException { protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
@ -89,6 +100,28 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.description = getProtocolName() + "Server@"; this.description = getProtocolName() + "Server@";
} }
public TransportFilter createFilter(Transport transport) {
return new TransportFilter(transport) {
public void onCommand(Command command) {
boolean debug = log.isDebugEnabled();
if (command.isResponse()) {
Response response = (Response) command;
FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
if (future != null) {
future.set(response);
}
else {
if (debug)
log.debug("Received unexpected response for command id: " + response.getCorrelationId());
}
}
else {
super.onCommand(command);
}
}
};
}
/** /**
* A one way asynchronous send * A one way asynchronous send
*/ */
@ -96,17 +129,53 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
oneway(command, targetAddress); oneway(command, targetAddress);
} }
/**
* A one way asynchronous send
*/
public void oneway(Command command, FutureResponse future) throws IOException {
oneway(command, targetAddress, future);
}
protected void oneway(Command command, SocketAddress address, FutureResponse future) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
}
checkStarted(command);
commandChannel.write(command, address, requestMap, future);
}
/** /**
* A one way asynchronous send to a given address * A one way asynchronous send to a given address
*/ */
public void oneway(Command command, SocketAddress address) throws IOException { public void oneway(Command command, SocketAddress address) throws IOException {
if (log.isDebugEnabled()) { oneway(command, address, null);
log.debug("Sending oneway from: " + this + " to target: " + targetAddress);
}
checkStarted(command);
commandChannel.write(command, address);
} }
public FutureResponse asyncRequest(Command command) throws IOException {
if (command.getCommandId() == 0) {
command.setCommandId(getNextCommandId());
}
command.setResponseRequired(true);
FutureResponse future = new FutureResponse();
oneway(command, future);
return future;
}
public Response request(Command command) throws IOException {
FutureResponse response = asyncRequest(command);
return response.getResult();
}
public Response request(Command command, int timeout) throws IOException {
FutureResponse response = asyncRequest(command);
return response.getResult(timeout);
}
public void setStartupRunnable(Runnable runnable) {
this.runnable = runnable;
}
/** /**
* @return pretty print of 'this' * @return pretty print of 'this'
*/ */
@ -124,6 +193,9 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
*/ */
public void run() { public void run() {
log.trace("Consumer thread starting for: " + toString()); log.trace("Consumer thread starting for: " + toString());
if (runnable != null) {
runnable.run();
}
while (!isStopped()) { while (!isStopped()) {
try { try {
Command command = commandChannel.read(); Command command = commandChannel.read();
@ -135,7 +207,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
stop(); stop();
} }
catch (Exception e2) { catch (Exception e2) {
log.warn("Caught while closing: " + e2 + ". Now Closed", e2); log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
} }
} }
catch (SocketException e) { catch (SocketException e) {
@ -145,23 +217,32 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
stop(); stop();
} }
catch (Exception e2) { catch (Exception e2) {
log.warn("Caught while closing: " + e2 + ". Now Closed", e2); log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
} }
} }
catch (Exception e) { catch (EOFException e) {
System.out.println("Caught exception of type: " + e.getClass()); // DataInputStream closed
e.printStackTrace(); log.debug("Socket closed: " + e, e);
try { try {
stop(); stop();
} }
catch (Exception e2) { catch (Exception e2) {
log.warn("Caught while closing: " + e2 + ". Now Closed", e2); log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
}
}
catch (Exception e) {
try {
stop();
}
catch (Exception e2) {
log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
} }
if (e instanceof IOException) { if (e instanceof IOException) {
onException((IOException) e); onException((IOException) e);
} }
else { else {
log.error(e); log.error("Caught: " + e, e);
e.printStackTrace();
} }
} }
} }
@ -182,6 +263,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
originalTargetAddress = targetAddress; originalTargetAddress = targetAddress;
} }
targetAddress = address; targetAddress = address;
commandChannel.setTargetAddress(address);
} }
} }
} }
@ -324,12 +406,12 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
if (bufferPool == null) { if (bufferPool == null) {
bufferPool = new DefaultBufferPool(); bufferPool = new DefaultBufferPool();
} }
return new CommandDatagramChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller()); return new CommandDatagramChannel(this, channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
} }
protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException { protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {
channel.configureBlocking(true); channel.configureBlocking(true);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Binding to address: " + localAddress); log.debug("Binding to address: " + localAddress);
} }
@ -340,7 +422,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
// TODO // TODO
// connect to default target address to avoid security checks each time // connect to default target address to avoid security checks each time
// channel = channel.connect(targetAddress); // channel = channel.connect(targetAddress);
return channel; return channel;
} }
@ -370,7 +452,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return targetAddress; return targetAddress;
} }
public void setCommandChannel(CommandChannel commandChannel) { protected synchronized int getNextCommandId() {
this.commandChannel = commandChannel; return ++lastCommandId;
} }
} }

View File

@ -17,6 +17,8 @@
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import org.activeio.command.WireFormat; import org.activeio.command.WireFormat;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner; import org.apache.activemq.transport.CommandJoiner;
@ -24,9 +26,13 @@ import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator; import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
@ -85,6 +91,9 @@ public class UdpTransportFactory extends TransportFactory {
if (udpTransport.getMaxInactivityDuration() > 0) { if (udpTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration()); transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
} }
// TODO should we have this?
//transport = udpTransport.createFilter(transport);
return transport; return transport;
} }
@ -96,6 +105,7 @@ public class UdpTransportFactory extends TransportFactory {
protected Transport configure(Transport transport, WireFormat format, Map options, boolean server) { protected Transport configure(Transport transport, WireFormat format, Map options, boolean server) {
IntrospectionSupport.setProperties(transport, options); IntrospectionSupport.setProperties(transport, options);
UdpTransport udpTransport = (UdpTransport) transport; UdpTransport udpTransport = (UdpTransport) transport;
OpenWireFormat openWireFormat = asOpenWireFormat(format);
if (udpTransport.isTrace()) { if (udpTransport.isTrace()) {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
@ -109,24 +119,46 @@ public class UdpTransportFactory extends TransportFactory {
transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration()); transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
} }
transport = new ResponseCorrelator(transport); // add reliabilty
//transport = new ReliableTransport(transport, createReplayStrategy());
// deal with fragmentation // deal with fragmentation
transport = new CommandJoiner(transport, asOpenWireFormat(format)); transport = new CommandJoiner(transport, openWireFormat);
transport = udpTransport.createFilter(transport);
return transport; return transport;
} }
protected ReplayStrategy createReplayStrategy() {
return new ExceptionIfDroppedReplayStrategy(1);
}
protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) { protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
return new TransportFilter(transport) {
public void onCommand(Command command) {
// redirect to the endpoint that the last response came from
Endpoint from = command.getFrom();
udpTransport.setTargetEndpoint(from);
super.onCommand(command);
}
};
/*
transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) { transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {
protected void onWireFormatNegotiated(WireFormatInfo info) { protected void onWireFormatNegotiated(WireFormatInfo info) {
// lets switch to the target endpoint // lets switch to the target endpoint
// based on the last packet that was received // based on the last packet that was received
// so that all future requests go to the newly created UDP channel // so that all future requests go to the newly created UDP channel
udpTransport.setTargetEndpoint(info.getFrom()); Endpoint from = info.getFrom();
System.out.println("####Êsetting the client side target to: " + from);
udpTransport.setTargetEndpoint(from);
} }
}; };
return transport; return transport;
*/
} }
protected OpenWireFormat asOpenWireFormat(WireFormat wf) { protected OpenWireFormat asOpenWireFormat(WireFormat wf) {

View File

@ -24,6 +24,7 @@ import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerSupport; import org.apache.activemq.transport.TransportServerSupport;
@ -49,13 +50,13 @@ public class UdpTransportServer extends TransportServerSupport {
private UdpTransport serverTransport; private UdpTransport serverTransport;
private Transport configuredTransport; private Transport configuredTransport;
private boolean usingWireFormatNegotiation;
private Map transports = new HashMap(); private Map transports = new HashMap();
public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport) { public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport) {
super(connectURI); super(connectURI);
this.serverTransport = serverTransport; this.serverTransport = serverTransport;
this.configuredTransport = configuredTransport; this.configuredTransport = configuredTransport;
// lets disable the incremental checking of the sequence numbers // lets disable the incremental checking of the sequence numbers
@ -86,6 +87,7 @@ public class UdpTransportServer extends TransportServerSupport {
} }
public void onException(IOException error) { public void onException(IOException error) {
log.error("Caught: " + error, error);
} }
public void transportInterupted() { public void transportInterupted() {
@ -110,16 +112,22 @@ public class UdpTransportServer extends TransportServerSupport {
synchronized (transports) { synchronized (transports) {
transport = (Transport) transports.get(endpoint); transport = (Transport) transports.get(endpoint);
if (transport == null) { if (transport == null) {
if (log.isDebugEnabled()) { if (usingWireFormatNegotiation && !command.isWireFormatInfo()) {
log.debug("Creating a new UDP server connection"); log.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command);
} }
try { else {
transport = createTransport(command, endpoint); if (log.isDebugEnabled()) {
transport = configureTransport(transport); log.debug("Creating a new UDP server connection");
transports.put(endpoint, transport); }
} try {
catch (IOException e) { transport = createTransport(command, endpoint);
getAcceptListener().onAcceptError(e); transport = configureTransport(transport);
transports.put(endpoint, transport);
}
catch (IOException e) {
log.error("Caught: " + e, e);
getAcceptListener().onAcceptError(e);
}
} }
} }
else { else {
@ -129,7 +137,7 @@ public class UdpTransportServer extends TransportServerSupport {
} }
protected Transport configureTransport(Transport transport) { protected Transport configureTransport(Transport transport) {
transport = new ResponseCorrelator(transport); // transport = new ResponseCorrelator(transport);
if (serverTransport.getMaxInactivityDuration() > 0) { if (serverTransport.getMaxInactivityDuration() > 0) {
transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration()); transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
@ -140,26 +148,54 @@ public class UdpTransportServer extends TransportServerSupport {
} }
protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException { protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException {
if (endpoint == null) {
//log.error("No endpoint available for command: " + command);
throw new IOException("No endpoint available for command: " + command);
}
final SocketAddress address = endpoint.getAddress(); final SocketAddress address = endpoint.getAddress();
final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy(); final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
final UdpTransport transport = new UdpTransport(connectionWireFormat, address); final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
Transport configuredTransport = new CommandJoiner(transport, connectionWireFormat); Transport configuredTransport = new CommandJoiner(transport, connectionWireFormat);
return new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) { // lets pass in the received transport
return new TransportFilter(configuredTransport) {
public void start() throws Exception {
super.start();
onCommand(command);
}
};
/**
// return configuredTransport;
// configuredTransport = transport.createFilter(configuredTransport);
final WireFormatNegotiator wireFormatNegotiator = new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport
.getMinmumWireFormatVersion()) {
public void start() throws Exception { public void start() throws Exception {
super.start(); super.start();
System.out.println("Starting a new server transport: " + this + " with command: " + command);
// process the inbound wireformat
onCommand(command); onCommand(command);
} }
// lets use the specific addressing of wire format // lets use the specific addressing of wire format
protected void sendWireFormat(WireFormatInfo info) throws IOException { protected void sendWireFormat(WireFormatInfo info) throws IOException {
System.out.println("#### we have negotiated the wireformat; sending a wireformat to: " + address);
transport.oneway(info, address); transport.oneway(info, address);
} }
}; };
return wireFormatNegotiator;
*/
/*
* transport.setStartupRunnable(new Runnable() {
*
* public void run() { System.out.println("Passing the incoming
* WireFormat into into: " + this);
* // process the inbound wireformat
* wireFormatNegotiator.onCommand(command); }});
*/
} }
} }

View File

@ -165,7 +165,7 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
* Waits for the messages to be delivered or when the wait time has been reached. * Waits for the messages to be delivered or when the wait time has been reached.
*/ */
protected void waitForMessagesToBeDelivered() { protected void waitForMessagesToBeDelivered() {
long maxWaitTime = 30000; long maxWaitTime = 60000;
long waitTime = maxWaitTime; long waitTime = maxWaitTime;
long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();

View File

@ -44,10 +44,14 @@ public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTes
connectionFactory = createConnectionFactory(); connectionFactory = createConnectionFactory();
log.info("Creating send connection");
sendConnection = createSendConnection(); sendConnection = createSendConnection();
log.info("Starting send connection");
sendConnection.start(); sendConnection.start();
log.info("Creating receive connection");
receiveConnection = createReceiveConnection(); receiveConnection = createReceiveConnection();
log.info("Starting receive connection");
receiveConnection.start(); receiveConnection.start();
log.info("Created sendConnection: " + sendConnection); log.info("Created sendConnection: " + sendConnection);

View File

@ -0,0 +1,31 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
/**
*
* @version $Revision$
*/
public class UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest extends UdpSendReceiveWithTwoConnectionsTest {
protected void setUp() throws Exception {
largeMessages = true;
//messageCount = 1;
super.setUp();
}
}

View File

@ -29,7 +29,6 @@ public class UdpSendReceiveWithTwoConnectionsTest extends JmsTopicSendReceiveWit
protected BrokerService broker; protected BrokerService broker;
protected void setUp() throws Exception { protected void setUp() throws Exception {
largeMessages = true;
broker = createBroker(); broker = createBroker();
broker.start(); broker.start();

View File

@ -21,6 +21,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportAcceptListener;
@ -189,6 +190,11 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
System.out.println("Got WireFormatInfo: " + command); System.out.println("Got WireFormatInfo: " + command);
} }
else { else {
if (command.isResponseRequired()) {
// lets send a response back...
sendResponse(command);
}
if (large) { if (large) {
System.out.println("### Received command: " + command.getClass() + " with id: " + command.getCommandId()); System.out.println("### Received command: " + command.getClass() + " with id: " + command.getCommandId());
} }
@ -203,6 +209,19 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
} }
} }
protected void sendResponse(Command command) {
Response response = new Response();
response.setCorrelationId(command.getCommandId());
try {
consumer.oneway(response);
}
catch (IOException e) {
System.out.println("Caught: " + e);
e.printStackTrace();
throw new RuntimeException(e);
}
}
public void onException(IOException error) { public void onException(IOException error) {
System.out.println("### Received error: " + error); System.out.println("### Received error: " + error);
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
@ -32,6 +34,18 @@ public class UdpTransportUsingServerTest extends UdpTestSupport {
protected String producerURI = "udp://localhost:" + consumerPort; protected String producerURI = "udp://localhost:" + consumerPort;
protected String serverURI = producerURI; protected String serverURI = producerURI;
public void testRequestResponse() throws Exception {
ConsumerInfo expected = new ConsumerInfo();
expected.setSelector("Edam");
expected.setResponseRequired(true);
System.out.println("About to send: " + expected);
Response response = producer.request(expected, 2000);
System.out.println("Received: " + response);
assertNotNull("Received a response", response);
assertTrue("Should not be an exception", !response.isException());
}
protected Transport createProducer() throws Exception { protected Transport createProducer() throws Exception {
System.out.println("Producer using URI: " + producerURI); System.out.println("Producer using URI: " + producerURI);
URI uri = new URI(producerURI); URI uri = new URI(producerURI);
@ -41,7 +55,7 @@ public class UdpTransportUsingServerTest extends UdpTestSupport {
protected TransportServer createServer() throws Exception { protected TransportServer createServer() throws Exception {
return TransportFactory.bind("byBroker", new URI(serverURI)); return TransportFactory.bind("byBroker", new URI(serverURI));
} }
protected Transport createConsumer() throws Exception { protected Transport createConsumer() throws Exception {
return null; return null;
} }