added a replay buffer to the ReliableTransport so that nodes with missing messages can re-request stuff

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@386198 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-15 22:43:40 +00:00
parent 3dd6d8afcb
commit 8d589ec6fa
14 changed files with 532 additions and 154 deletions

View File

@ -18,10 +18,8 @@ package org.apache.activemq.transport.multicast;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.udp.CommandChannel; import org.apache.activemq.transport.udp.CommandChannel;
import org.apache.activemq.transport.udp.CommandDatagramChannel;
import org.apache.activemq.transport.udp.CommandDatagramSocket; import org.apache.activemq.transport.udp.CommandDatagramSocket;
import org.apache.activemq.transport.udp.DatagramHeaderMarshaller; import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
import org.apache.activemq.transport.udp.DefaultBufferPool;
import org.apache.activemq.transport.udp.UdpTransport; import org.apache.activemq.transport.udp.UdpTransport;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -36,7 +34,6 @@ import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.channels.DatagramChannel;
/** /**
* A multicast based transport. * A multicast based transport.
@ -75,7 +72,7 @@ public class MulticastTransport extends UdpTransport {
super.doStop(stopper); super.doStop(stopper);
if (socket != null) { if (socket != null) {
try { try {
socket.leaveGroup(mcastAddress); socket.leaveGroup(getMulticastAddress());
} }
catch (IOException e) { catch (IOException e) {
stopper.onException(this, e); stopper.onException(this, e);
@ -89,11 +86,15 @@ public class MulticastTransport extends UdpTransport {
socket.setLoopbackMode(loopBackMode); socket.setLoopbackMode(loopBackMode);
socket.setTimeToLive(timeToLive); socket.setTimeToLive(timeToLive);
log.debug("Joining multicast address: " + mcastAddress); log.debug("Joining multicast address: " + getMulticastAddress());
socket.joinGroup(mcastAddress); socket.joinGroup(getMulticastAddress());
socket.setSoTimeout((int) keepAliveInterval); socket.setSoTimeout((int) keepAliveInterval);
return new CommandDatagramSocket(this, socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller()); return new CommandDatagramSocket( this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), socket);
}
protected InetAddress getMulticastAddress() {
return mcastAddress;
} }
protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException { protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {

View File

@ -0,0 +1,76 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*
* @version $Revision$
*/
public class DefaultReplayBuffer implements ReplayBuffer {
private final int size;
private ReplayBufferListener listener;
private Map map;
private int lowestCommandId = 1;
private Object lock = new Object();
public DefaultReplayBuffer(int size) {
this.size = size;
map = createMap(size);
}
public void addBuffer(int commandId, Object buffer) {
synchronized (lock) {
int max = size - 1;
while (map.size() >= max) {
// lets find things to evict
Object evictedBuffer = map.remove(new Integer(++lowestCommandId));
onEvictedBuffer(lowestCommandId, evictedBuffer);
}
map.put(new Integer(commandId), buffer);
}
}
public void setReplayBufferListener(ReplayBufferListener bufferPoolAdapter) {
this.listener = bufferPoolAdapter;
}
public void replayMessages(int fromCommandId, int toCommandId, Replayer replayer) throws IOException {
for (int i = fromCommandId; i <= toCommandId; i++) {
Object buffer = null;
synchronized (lock) {
buffer = map.get(new Integer(i));
}
replayer.sendBuffer(i, buffer);
}
}
protected Map createMap(int maximumSize) {
return new HashMap(maximumSize);
}
protected void onEvictedBuffer(int commandId, Object buffer) {
if (listener != null) {
listener.onBufferDiscarded(commandId, buffer);
}
}
}

View File

@ -0,0 +1,69 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import java.io.IOException;
/**
* Throws an exception if packets are dropped causing the transport to be
* closed.
*
* @version $Revision$
*/
public class DefaultReplayStrategy implements ReplayStrategy {
private int maximumDifference = 5;
public DefaultReplayStrategy() {
}
public DefaultReplayStrategy(int maximumDifference) {
this.maximumDifference = maximumDifference;
}
public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
int difference = actualCounter - expectedCounter;
long count = Math.abs(difference);
if (count > maximumDifference) {
int upperLimit = actualCounter;
if (upperLimit < expectedCounter) {
upperLimit = expectedCounter;
}
transport.requestReplay(expectedCounter, upperLimit );
}
// lets discard old commands
return difference > 0;
}
public void onReceivedPacket(ReliableTransport transport, long expectedCounter) {
// TODO we could pro-actively evict stuff from the buffer if we knew there was only one client
}
public int getMaximumDifference() {
return maximumDifference;
}
/**
* Sets the maximum allowed difference between an expected packet and an
* actual packet before an error occurs
*/
public void setMaximumDifference(int maximumDifference) {
this.maximumDifference = maximumDifference;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.reliable; package org.apache.activemq.transport.reliable;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ReplayCommand;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.openwire.CommandIdComparator; import org.apache.activemq.openwire.CommandIdComparator;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
@ -42,7 +43,10 @@ public class ReliableTransport extends ResponseCorrelator {
private ReplayStrategy replayStrategy; private ReplayStrategy replayStrategy;
private SortedSet commands = new TreeSet(new CommandIdComparator()); private SortedSet commands = new TreeSet(new CommandIdComparator());
private int expectedCounter = 1; private int expectedCounter = 1;
private int replayBufferCommandCount = 50;
private int requestTimeout = 2000; private int requestTimeout = 2000;
private ReplayBuffer replayBuffer;
private Replayer replayer;
public ReliableTransport(Transport next, ReplayStrategy replayStrategy) { public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
super(next); super(next);
@ -54,6 +58,21 @@ public class ReliableTransport extends ResponseCorrelator {
this.replayStrategy = replayStrategy; this.replayStrategy = replayStrategy;
} }
/**
* Requests that a range of commands be replayed
*/
public void requestReplay(int fromCommandId, int toCommandId) {
ReplayCommand replay = new ReplayCommand();
replay.setFirstNakNumber(fromCommandId);
replay.setLastNakNumber(toCommandId);
try {
oneway(replay);
}
catch (IOException e) {
getTransportListener().onException(e);
}
}
public Response request(Command command) throws IOException { public Response request(Command command) throws IOException {
FutureResponse response = asyncRequest(command); FutureResponse response = asyncRequest(command);
@ -62,7 +81,7 @@ public class ReliableTransport extends ResponseCorrelator {
if (result != null) { if (result != null) {
return result; return result;
} }
replayRequest(command, response); onMissingResponse(command, response);
} }
} }
@ -77,7 +96,7 @@ public class ReliableTransport extends ResponseCorrelator {
if (result != null) { if (result != null) {
return result; return result;
} }
replayRequest(command, response); onMissingResponse(command, response);
timeout -= time; timeout -= time;
} }
return response.getResult(0); return response.getResult(0);
@ -89,6 +108,10 @@ public class ReliableTransport extends ResponseCorrelator {
super.onCommand(command); super.onCommand(command);
return; return;
} }
else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
replayCommands((ReplayCommand) command);
return;
}
int actualCounter = command.getCommandId(); int actualCounter = command.getCommandId();
boolean valid = expectedCounter == actualCounter; boolean valid = expectedCounter == actualCounter;
@ -107,7 +130,7 @@ public class ReliableTransport extends ResponseCorrelator {
} }
} }
catch (IOException e) { catch (IOException e) {
getTransportListener().onException(e); onException(e);
} }
if (!commands.isEmpty()) { if (!commands.isEmpty()) {
@ -180,13 +203,61 @@ public class ReliableTransport extends ResponseCorrelator {
} }
public ReplayBuffer getReplayBuffer() {
return replayBuffer;
}
public void setReplayBuffer(ReplayBuffer replayBuffer) {
this.replayBuffer = replayBuffer;
}
public int getReplayBufferCommandCount() {
return replayBufferCommandCount;
}
/**
* Sets the default number of commands which are buffered
*/
public void setReplayBufferCommandCount(int replayBufferSize) {
this.replayBufferCommandCount = replayBufferSize;
}
public String toString() { public String toString() {
return next.toString(); return next.toString();
} }
public void start() throws Exception {
super.start();
if (replayBuffer == null) {
replayBuffer = createReplayBuffer();
}
}
/** /**
* Lets attempt to replay the request as a command may have disappeared * Lets attempt to replay the request as a command may have disappeared
*/ */
protected void replayRequest(Command command, FutureResponse response) { protected void onMissingResponse(Command command, FutureResponse response) {
log.debug("Still waiting for response on: " + this + " to command: " + command); log.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
int commandId = command.getCommandId();
requestReplay(commandId, commandId);
} }
protected ReplayBuffer createReplayBuffer() {
return new DefaultReplayBuffer(getReplayBufferCommandCount());
}
protected void replayCommands(ReplayCommand command) {
try {
replayBuffer.replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
// TODO we could proactively remove ack'd stuff from the replay buffer
// if we only have a single client talking to us
}
catch (IOException e) {
onException(e);
}
}
} }

View File

@ -0,0 +1,39 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import java.io.IOException;
/**
* This class keeps around a buffer of old commands which have been sent on
* an unreliable transport. The buffers are of type Object as they could be datagrams
* or byte[] or ByteBuffer - depending on the underlying transport implementation.
*
* @version $Revision$
*/
public interface ReplayBuffer {
/**
* Submit a buffer for caching around for a period of time, during which time it can be replayed
* to users interested in it.
*/
public void addBuffer(int commandId, Object buffer);
public void setReplayBufferListener(ReplayBufferListener bufferPoolAdapter);
public void replayMessages(int fromCommandId, int toCommandId, Replayer replayer) throws IOException;
}

View File

@ -0,0 +1,31 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
/**
* Listens to events on a {@link ReplayBuffer}
*
* @version $Revision$
*/
public interface ReplayBufferListener {
/**
* Indications that the buffer has been discarded and so could be
* re-introduced into some pool
*/
public void onBufferDiscarded(int commandId, Object buffer);
}

View File

@ -0,0 +1,37 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.reliable;
import java.io.IOException;
/**
* Used by a {@link ReplayBuffer} to replay buffers back over an unreliable transport
*
* @version $Revision$
*/
public interface Replayer {
/**
* Sends the given buffer back to the transport
* if the buffer could be found - otherwise maybe send some kind
* of exception
*
* @param commandId the command ID
* @param buffer the buffer to be sent - or null if the buffer no longer exists in the buffer
*/
void sendBuffer(int commandId, Object buffer) throws IOException;
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.transport.reliable.Replayer;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -26,7 +27,7 @@ import java.net.SocketAddress;
* *
* @version $Revision$ * @version $Revision$
*/ */
public interface CommandChannel extends Service { public interface CommandChannel extends Replayer, Service {
public abstract Command read() throws IOException; public abstract Command read() throws IOException;
@ -45,4 +46,5 @@ public interface CommandChannel extends Service {
public abstract void setTargetAddress(SocketAddress address); public abstract void setTargetAddress(SocketAddress address);
public abstract void setReplayAddress(SocketAddress address);
} }

View File

@ -0,0 +1,101 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.apache.activemq.command.Command;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.IntSequenceGenerator;
import java.io.IOException;
import java.net.SocketAddress;
/**
*
* @version $Revision$
*/
public abstract class CommandChannelSupport implements CommandChannel {
protected OpenWireFormat wireFormat;
protected int datagramSize = 4 * 1024;
protected SocketAddress targetAddress;
protected SocketAddress replayAddress;
protected final String name;
protected final IntSequenceGenerator sequenceGenerator;
protected DatagramHeaderMarshaller headerMarshaller;
public CommandChannelSupport(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress,
DatagramHeaderMarshaller headerMarshaller) {
this.wireFormat = wireFormat;
this.datagramSize = datagramSize;
this.targetAddress = targetAddress;
this.headerMarshaller = headerMarshaller;
this.name = transport.toString();
this.sequenceGenerator = transport.getSequenceGenerator();
this.replayAddress = targetAddress;
if (sequenceGenerator == null) {
throw new IllegalArgumentException("No sequenceGenerator on the given transport: " + transport);
}
}
public void write(Command command) throws IOException {
write(command, targetAddress);
}
// Properties
// -------------------------------------------------------------------------
public int getDatagramSize() {
return datagramSize;
}
/**
* Sets the default size of a datagram on the network.
*/
public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize;
}
public SocketAddress getTargetAddress() {
return targetAddress;
}
public void setTargetAddress(SocketAddress targetAddress) {
this.targetAddress = targetAddress;
}
public SocketAddress getReplayAddress() {
return replayAddress;
}
public void setReplayAddress(SocketAddress replayAddress) {
this.replayAddress = replayAddress;
}
public String toString() {
return "CommandChannel#" + name;
}
public DatagramHeaderMarshaller getHeaderMarshaller() {
return headerMarshaller;
}
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
this.headerMarshaller = headerMarshaller;
}
}

View File

@ -24,7 +24,6 @@ import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand; import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -40,54 +39,30 @@ import java.nio.channels.DatagramChannel;
* *
* @version $Revision$ * @version $Revision$
*/ */
public class CommandDatagramChannel implements CommandChannel { public class CommandDatagramChannel extends CommandChannelSupport {
private static final Log log = LogFactory.getLog(CommandDatagramChannel.class); private static final Log log = LogFactory.getLog(CommandDatagramChannel.class);
private final String name;
private final IntSequenceGenerator sequenceGenerator;
private DatagramChannel channel; private DatagramChannel channel;
private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool; private ByteBufferPool bufferPool;
private int datagramSize = 4 * 1024;
private SocketAddress targetAddress;
private DatagramHeaderMarshaller headerMarshaller;
// reading // reading
private Object readLock = new Object(); private Object readLock = new Object();
private ByteBuffer readBuffer; private ByteBuffer readBuffer;
// writing // writing
private Object writeLock = new Object(); private Object writeLock = new Object();
private ByteBuffer writeBuffer;
private int defaultMarshalBufferSize = 64 * 1024; private int defaultMarshalBufferSize = 64 * 1024;
public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel, ByteBufferPool bufferPool) {
super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
public CommandDatagramChannel(UdpTransport transport, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
this.channel = channel; this.channel = channel;
this.wireFormat = wireFormat;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.datagramSize = datagramSize;
this.targetAddress = targetAddress;
this.headerMarshaller = headerMarshaller;
this.name = transport.toString();
this.sequenceGenerator = transport.getSequenceGenerator();
if (sequenceGenerator == null) {
throw new IllegalArgumentException("No sequenceGenerator on the given transport: " + transport);
}
}
public String toString() {
return "CommandChannel#" + name;
} }
public void start() throws Exception { public void start() throws Exception {
bufferPool.setDefaultSize(datagramSize); bufferPool.setDefaultSize(datagramSize);
bufferPool.start(); bufferPool.start();
readBuffer = bufferPool.borrowBuffer(); readBuffer = bufferPool.borrowBuffer();
writeBuffer = bufferPool.borrowBuffer();
} }
public void stop() throws Exception { public void stop() throws Exception {
@ -132,6 +107,7 @@ public class CommandDatagramChannel implements CommandChannel {
return answer; return answer;
} }
public void write(Command command, SocketAddress address) throws IOException { public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) { synchronized (writeLock) {
@ -140,6 +116,7 @@ public class CommandDatagramChannel implements CommandChannel {
byte[] data = largeBuffer.toByteArray(); byte[] data = largeBuffer.toByteArray();
int size = data.length; int size = data.length;
ByteBuffer writeBuffer = bufferPool.borrowBuffer();
writeBuffer.clear(); writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer); headerMarshaller.writeHeader(command, writeBuffer);
@ -215,7 +192,7 @@ public class CommandDatagramChannel implements CommandChannel {
writeBuffer.put(data, offset, chunkSize); writeBuffer.put(data, offset, chunkSize);
offset += chunkSize; offset += chunkSize;
sendWriteBuffer(address, commandId); sendWriteBuffer(address, writeBuffer, commandId);
} }
// now lets write the last partial command // now lets write the last partial command
@ -231,21 +208,13 @@ public class CommandDatagramChannel implements CommandChannel {
} }
writeBuffer.put(data); writeBuffer.put(data);
sendWriteBuffer(address, command.getCommandId()); sendWriteBuffer(address, writeBuffer, command.getCommandId());
} }
} }
// Properties // Properties
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
public int getDatagramSize() {
return datagramSize;
}
public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize;
}
public ByteBufferPool getBufferPool() { public ByteBufferPool getBufferPool() {
return bufferPool; return bufferPool;
} }
@ -257,32 +226,23 @@ public class CommandDatagramChannel implements CommandChannel {
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
} }
public DatagramHeaderMarshaller getHeaderMarshaller() {
return headerMarshaller;
}
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
this.headerMarshaller = headerMarshaller;
}
public SocketAddress getTargetAddress() {
return targetAddress;
}
public void setTargetAddress(SocketAddress targetAddress) {
this.targetAddress = targetAddress;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected void sendWriteBuffer(SocketAddress address, int commandId) throws IOException { protected void sendWriteBuffer(SocketAddress address, ByteBuffer writeBuffer, int commandId) throws IOException {
writeBuffer.flip(); writeBuffer.flip();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address); log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address);
} }
channel.send(writeBuffer, address); channel.send(writeBuffer, address);
// now lets put the buffer back into the replay buffer
} }
public void sendBuffer(int commandId, Object buffer) throws IOException {
ByteBuffer writeBuffer = (ByteBuffer) buffer;
sendWriteBuffer(getReplayAddress(), writeBuffer, commandId);
}
} }

View File

@ -24,7 +24,6 @@ import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand; import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream; import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -33,8 +32,6 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
/** /**
@ -42,38 +39,18 @@ import java.net.SocketAddress;
* *
* @version $Revision$ * @version $Revision$
*/ */
public class CommandDatagramSocket implements CommandChannel { public class CommandDatagramSocket extends CommandChannelSupport {
private static final Log log = LogFactory.getLog(CommandDatagramSocket.class); private static final Log log = LogFactory.getLog(CommandDatagramSocket.class);
private final String name;
private DatagramSocket channel; private DatagramSocket channel;
private InetAddress targetAddress;
private int targetPort;
private OpenWireFormat wireFormat;
private int datagramSize = 4 * 1024;
private DatagramHeaderMarshaller headerMarshaller;
private IntSequenceGenerator sequenceGenerator;
private Object readLock = new Object(); private Object readLock = new Object();
private Object writeLock = new Object(); private Object writeLock = new Object();
public CommandDatagramSocket(UdpTransport transport, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress, public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress,
int targetPort, DatagramHeaderMarshaller headerMarshaller) { DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel) {
super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
this.channel = channel; this.channel = channel;
this.wireFormat = wireFormat;
this.datagramSize = datagramSize;
this.targetAddress = targetAddress;
this.targetPort = targetPort;
this.headerMarshaller = headerMarshaller;
this.name = transport.toString();
this.sequenceGenerator = transport.getSequenceGenerator();
if (sequenceGenerator == null) {
throw new IllegalArgumentException("No sequenceGenerator on the given transport: " + transport);
}
}
public String toString() {
return "CommandChannel#" + name;
} }
public void start() throws Exception { public void start() throws Exception {
@ -110,11 +87,6 @@ public class CommandDatagramSocket implements CommandChannel {
} }
public void write(Command command, SocketAddress address) throws IOException { public void write(Command command, SocketAddress address) throws IOException {
InetSocketAddress ia = (InetSocketAddress) address;
write(command, ia.getAddress(), ia.getPort());
}
public void write(Command command, InetAddress address, int port) throws IOException {
synchronized (writeLock) { synchronized (writeLock) {
ByteArrayOutputStream writeBuffer = createByteArrayOutputStream(); ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
@ -126,7 +98,7 @@ public class CommandDatagramSocket implements CommandChannel {
wireFormat.marshal(command, dataOut); wireFormat.marshal(command, dataOut);
if (remaining(writeBuffer) >= 0) { if (remaining(writeBuffer) >= 0) {
sendWriteBuffer(address, port, writeBuffer, command.getCommandId()); sendWriteBuffer(address, writeBuffer, command.getCommandId());
} }
else { else {
// lets split the command up into chunks // lets split the command up into chunks
@ -197,7 +169,7 @@ public class CommandDatagramSocket implements CommandChannel {
dataOut.write(data, offset, chunkSize); dataOut.write(data, offset, chunkSize);
offset += chunkSize; offset += chunkSize;
sendWriteBuffer(address, port, writeBuffer, commandId); sendWriteBuffer(address, writeBuffer, commandId);
} }
// now lets write the last partial command // now lets write the last partial command
@ -208,60 +180,39 @@ public class CommandDatagramSocket implements CommandChannel {
headerMarshaller.writeHeader(command, dataOut); headerMarshaller.writeHeader(command, dataOut);
wireFormat.marshal(command, dataOut); wireFormat.marshal(command, dataOut);
sendWriteBuffer(address, port, writeBuffer, command.getCommandId()); sendWriteBuffer(address, writeBuffer, command.getCommandId());
} }
} }
} }
// Properties
// -------------------------------------------------------------------------
public int getDatagramSize() { public int getDatagramSize() {
return datagramSize; return datagramSize;
} }
/**
* Sets the default size of a datagram on the network.
*/
public void setDatagramSize(int datagramSize) { public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize; this.datagramSize = datagramSize;
} }
public DatagramHeaderMarshaller getHeaderMarshaller() {
return headerMarshaller;
}
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
this.headerMarshaller = headerMarshaller;
}
public SocketAddress getTargetAddress() {
return new InetSocketAddress(targetAddress, targetPort);
}
public void setTargetAddress(SocketAddress address) {
if (address instanceof InetSocketAddress) {
InetSocketAddress ia = (InetSocketAddress) address;
targetAddress = ia.getAddress();
targetPort = ia.getPort();
}
else {
throw new IllegalArgumentException("Address must be instance of InetSocketAddress");
}
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer, int commandId) throws IOException { protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
byte[] data = writeBuffer.toByteArray();
sendWriteBuffer(address, commandId, data);
}
protected void sendWriteBuffer(SocketAddress address, int commandId, byte[] data) throws IOException {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address); log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address);
} }
byte[] data = writeBuffer.toByteArray(); DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
DatagramPacket packet = new DatagramPacket(data, 0, data.length, address, port);
channel.send(packet); channel.send(packet);
} }
public void sendBuffer(int commandId, Object buffer) throws IOException {
byte[] data = (byte[]) buffer;
sendWriteBuffer(replayAddress, commandId, data);
}
protected DatagramPacket createDatagramPacket() { protected DatagramPacket createDatagramPacket() {
return new DatagramPacket(new byte[datagramSize], datagramSize); return new DatagramPacket(new byte[datagramSize], datagramSize);
} }

View File

@ -24,6 +24,7 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport; import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReplayStrategy; import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.transport.reliable.Replayer;
import org.apache.activemq.util.IntSequenceGenerator; import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -66,6 +67,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private String description = null; private String description = null;
private Runnable runnable; private Runnable runnable;
private IntSequenceGenerator sequenceGenerator; private IntSequenceGenerator sequenceGenerator;
private boolean replayEnabled = true;
protected UdpTransport(OpenWireFormat wireFormat) throws IOException { protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
@ -93,6 +95,18 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.description = getProtocolName() + "Server@"; this.description = getProtocolName() + "Server@";
} }
/**
* Creates a replayer for working with the reliable transport
* @return
*/
public Replayer createReplayer() {
if (replayEnabled ) {
return commandChannel;
}
return null;
}
/** /**
* A one way asynchronous send * A one way asynchronous send
*/ */
@ -311,6 +325,19 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
this.sequenceGenerator = sequenceGenerator; this.sequenceGenerator = sequenceGenerator;
} }
public boolean isReplayEnabled() {
return replayEnabled;
}
/**
* Sets whether or not replay should be enabled when using the reliable transport.
* i.e. should we maintain a buffer of messages that can be replayed?
*/
public void setReplayEnabled(boolean replayEnabled) {
this.replayEnabled = replayEnabled;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -354,7 +381,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
if (bufferPool == null) { if (bufferPool == null) {
bufferPool = new DefaultBufferPool(); bufferPool = new DefaultBufferPool();
} }
return new CommandDatagramChannel(this, channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller()); return new CommandDatagramChannel(this, wireFormat, datagramSize, targetAddress, createDatagramHeaderMarshaller(), channel, bufferPool);
} }
protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException { protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {

View File

@ -27,9 +27,11 @@ import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReliableTransport; import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy; import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.transport.reliable.Replayer;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntSequenceGenerator; import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
@ -56,15 +58,7 @@ public class UdpTransportFactory extends TransportFactory {
UdpTransport transport = new UdpTransport(openWireFormat, port); UdpTransport transport = new UdpTransport(openWireFormat, port);
Transport configuredTransport = configure(transport, wf, options, true); Transport configuredTransport = configure(transport, wf, options, true);
ReplayStrategy replayStrategy = null; UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy());
if (configuredTransport instanceof ReliableTransport) {
ReliableTransport rt = (ReliableTransport) configuredTransport;
replayStrategy = rt.getReplayStrategy();
}
if (replayStrategy == null) {
replayStrategy = createReplayStrategy();
}
UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, replayStrategy);
return server; return server;
} }
catch (URISyntaxException e) { catch (URISyntaxException e) {
@ -106,16 +100,25 @@ public class UdpTransportFactory extends TransportFactory {
return new UdpTransport(wireFormat, location); return new UdpTransport(wireFormat, location);
} }
protected Transport configure(Transport transport, WireFormat format, Map options, boolean server) { /**
* Configures the transport
*
* @param acceptServer
* true if this transport is used purely as an 'accept' transport
* for new connections which work like TCP SocketServers where
* new connections spin up a new separate UDP transport
*/
protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) {
IntrospectionSupport.setProperties(transport, options); IntrospectionSupport.setProperties(transport, options);
UdpTransport udpTransport = (UdpTransport) transport; UdpTransport udpTransport = (UdpTransport) transport;
OpenWireFormat openWireFormat = asOpenWireFormat(format); OpenWireFormat openWireFormat = asOpenWireFormat(format);
if (udpTransport.isTrace()) { if (udpTransport.isTrace()) {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
} }
if (!server && format instanceof OpenWireFormat) { if (!acceptServer && format instanceof OpenWireFormat) {
transport = configureClientSideNegotiator(transport, format, udpTransport); transport = configureClientSideNegotiator(transport, format, udpTransport);
} }
@ -125,7 +128,11 @@ public class UdpTransportFactory extends TransportFactory {
// deal with fragmentation // deal with fragmentation
if (server) { if (acceptServer) {
// lets not support a buffer of messages to enable reliable
// messaging on the 'accept server' transport
udpTransport.setReplayEnabled(true);
// we don't want to do reliable checks on this transport as we // we don't want to do reliable checks on this transport as we
// delegate to one that does // delegate to one that does
transport = new CommandJoiner(transport, openWireFormat); transport = new CommandJoiner(transport, openWireFormat);
@ -133,7 +140,8 @@ public class UdpTransportFactory extends TransportFactory {
return transport; return transport;
} }
else { else {
ReliableTransport reliableTransport = new ReliableTransport(transport, createReplayStrategy()); Replayer replayer = udpTransport.createReplayer();
ReliableTransport reliableTransport = new ReliableTransport(transport, createReplayStrategy(replayer));
udpTransport.setSequenceGenerator(reliableTransport.getSequenceGenerator()); udpTransport.setSequenceGenerator(reliableTransport.getSequenceGenerator());
// Joiner must be on outside as the inbound messages must be // Joiner must be on outside as the inbound messages must be
@ -142,10 +150,17 @@ public class UdpTransportFactory extends TransportFactory {
} }
} }
protected ReplayStrategy createReplayStrategy() { protected ReplayStrategy createReplayStrategy(Replayer replayer) {
if (replayer != null) {
return new DefaultReplayStrategy(5);
}
return new ExceptionIfDroppedReplayStrategy(1); return new ExceptionIfDroppedReplayStrategy(1);
} }
protected ReplayStrategy createReplayStrategy() {
return new DefaultReplayStrategy(5);
}
protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) { protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
return new TransportFilter(transport) { return new TransportFilter(transport) {

View File

@ -25,14 +25,12 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener; import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import javax.jms.MessageNotWriteableException; import javax.jms.MessageNotWriteableException;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import junit.framework.TestCase; import junit.framework.TestCase;