optionally replay messages on the fault tolerant transport

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@631244 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-02-26 15:00:37 +00:00
parent 25b6812daa
commit e225ba622c
3 changed files with 85 additions and 14 deletions

View File

@ -18,6 +18,8 @@ package org.apache.activemq.state;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
@ -28,6 +30,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
@ -54,6 +57,16 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
private boolean restoreConsumers = true; private boolean restoreConsumers = true;
private boolean restoreProducers = true; private boolean restoreProducers = true;
private boolean restoreTransaction = true; private boolean restoreTransaction = true;
private boolean trackMessages = true;
private int maxCacheSize = 128 * 1024;
private int currentCacheSize;
private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) {
boolean result = currentCacheSize > maxCacheSize;
currentCacheSize -= eldest.getValue().getSize();
return result;
}
};
private class RemoveTransactionAction implements Runnable { private class RemoveTransactionAction implements Runnable {
@ -87,6 +100,15 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
} }
} }
public void trackBack(Command command) {
if (trackMessages && command != null && command.isMessage()) {
Message message = (Message) command;
if (message.getTransactionId()==null) {
currentCacheSize+=message.getSize();
}
}
}
public void restore(Transport transport) throws IOException { public void restore(Transport transport) throws IOException {
// Restore the connections. // Restore the connections.
for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) { for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
@ -102,6 +124,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
restoreTransactions(transport, connectionState); restoreTransactions(transport, connectionState);
} }
} }
//now flush messages
for (Message msg:messageCache.values()) {
transport.oneway(msg);
}
} }
private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
@ -311,7 +337,8 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
} }
public Response processMessage(Message send) throws Exception { public Response processMessage(Message send) throws Exception {
if (trackTransactions && send != null && send.getTransactionId() != null) { if (send != null) {
if (trackTransactions && send.getTransactionId() != null) {
ConnectionId connectionId = send.getProducerId().getParentId().getParentId(); ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
if (connectionId != null) { if (connectionId != null) {
ConnectionState cs = connectionStates.get(connectionId); ConnectionState cs = connectionStates.get(connectionId);
@ -323,6 +350,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
} }
} }
return TRACKED_RESPONSE_MARKER; return TRACKED_RESPONSE_MARKER;
}else if (trackMessages) {
messageCache.put(send.getMessageId(), send.copy());
}
} }
return null; return null;
} }
@ -483,4 +513,20 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
this.restoreTransaction = restoreTransaction; this.restoreTransaction = restoreTransaction;
} }
public boolean isTrackMessages() {
return trackMessages;
}
public void setTrackMessages(boolean trackMessages) {
this.trackMessages = trackMessages;
}
public int getMaxCacheSize() {
return maxCacheSize;
}
public void setMaxCacheSize(int maxCacheSize) {
this.maxCacheSize = maxCacheSize;
}
} }

View File

@ -93,7 +93,7 @@ public class ResponseCorrelator extends TransportFilter {
future.set(response); future.set(response);
} else { } else {
if (debug) { if (debug) {
LOG.debug("Received unexpected response for command id: " + response.getCorrelationId()); LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId());
} }
} }
} else { } else {

View File

@ -32,6 +32,7 @@ import java.util.concurrent.ThreadFactory;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker; import org.apache.activemq.state.ConnectionStateTracker;
@ -92,6 +93,8 @@ public class FailoverTransport implements CompositeTransport {
private boolean backup=false; private boolean backup=false;
private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>(); private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
private int backupPoolSize=1; private int backupPoolSize=1;
private boolean trackMessages = true;
private int maxCacheSize = 128 * 1024;
private final TransportListener myTransportListener = createTransportListener(); private final TransportListener myTransportListener = createTransportListener();
@ -223,6 +226,8 @@ public class FailoverTransport implements CompositeTransport {
return; return;
} }
started = true; started = true;
stateTracker.setMaxCacheSize(getMaxCacheSize());
stateTracker.setTrackMessages(isTrackMessages());
if (connectedTransport != null) { if (connectedTransport != null) {
stateTracker.restore(connectedTransport); stateTracker.restore(connectedTransport);
} else { } else {
@ -336,6 +341,22 @@ public class FailoverTransport implements CompositeTransport {
this.backupPoolSize = backupPoolSize; this.backupPoolSize = backupPoolSize;
} }
public boolean isTrackMessages() {
return trackMessages;
}
public void setTrackMessages(boolean trackMessages) {
this.trackMessages = trackMessages;
}
public int getMaxCacheSize() {
return maxCacheSize;
}
public void setMaxCacheSize(int maxCacheSize) {
this.maxCacheSize = maxCacheSize;
}
/** /**
* @return Returns true if the command is one sent when a connection * @return Returns true if the command is one sent when a connection
* is being closed. * is being closed.
@ -407,6 +428,7 @@ public class FailoverTransport implements CompositeTransport {
// Send the message. // Send the message.
try { try {
connectedTransport.oneway(command); connectedTransport.oneway(command);
stateTracker.trackBack(command);
} catch (IOException e) { } catch (IOException e) {
// If the command was not tracked.. we will retry in // If the command was not tracked.. we will retry in
@ -548,6 +570,10 @@ public class FailoverTransport implements CompositeTransport {
protected void restoreTransport(Transport t) throws Exception, IOException { protected void restoreTransport(Transport t) throws Exception, IOException {
t.start(); t.start();
//send information to the broker - informing it we are an ft client
ConnectionControl cc = new ConnectionControl();
cc.setFaultTolerant(true);
t.oneway(cc);
stateTracker.restore(t); stateTracker.restore(t);
for (Iterator<Command> iter2 = requestMap.values().iterator(); iter2.hasNext();) { for (Iterator<Command> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
Command command = iter2.next(); Command command = iter2.next();
@ -753,5 +779,4 @@ public class FailoverTransport implements CompositeTransport {
public void reconnect(URI uri) throws IOException { public void reconnect(URI uri) throws IOException {
add(new URI[] {uri}); add(new URI[] {uri});
} }
} }