diff --git a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index d218a75ef4..56aa842b6c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -18,6 +18,8 @@ package org.apache.activemq.state; import java.io.IOException; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.command.Command; @@ -28,6 +30,7 @@ import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; @@ -48,14 +51,24 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); - + private boolean trackTransactions; private boolean restoreSessions = true; private boolean restoreConsumers = true; private boolean restoreProducers = true; private boolean restoreTransaction = true; - - + private boolean trackMessages = true; + private int maxCacheSize = 128 * 1024; + private int currentCacheSize; + private Map messageCache = new LinkedHashMap(){ + protected boolean removeEldestEntry(Map.Entry eldest) { + boolean result = currentCacheSize > maxCacheSize; + currentCacheSize -= eldest.getValue().getSize(); + return result; + } + }; + + private class RemoveTransactionAction implements Runnable { private final TransactionInfo info; @@ -86,6 +99,15 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { throw IOExceptionSupport.create(e); } } + + public void trackBack(Command command) { + if (trackMessages && command != null && command.isMessage()) { + Message message = (Message) command; + if (message.getTransactionId()==null) { + currentCacheSize+=message.getSize(); + } + } + } public void restore(Transport transport) throws IOException { // Restore the connections. @@ -102,6 +124,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { restoreTransactions(transport, connectionState); } } + //now flush messages + for (Message msg:messageCache.values()) { + transport.oneway(msg); + } } private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { @@ -311,18 +337,22 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } public Response processMessage(Message send) throws Exception { - if (trackTransactions && send != null && send.getTransactionId() != null) { - ConnectionId connectionId = send.getProducerId().getParentId().getParentId(); - if (connectionId != null) { - ConnectionState cs = connectionStates.get(connectionId); - if (cs != null) { - TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); - if (transactionState != null) { - transactionState.addCommand(send); + if (send != null) { + if (trackTransactions && send.getTransactionId() != null) { + ConnectionId connectionId = send.getProducerId().getParentId().getParentId(); + if (connectionId != null) { + ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); + if (transactionState != null) { + transactionState.addCommand(send); + } } } + return TRACKED_RESPONSE_MARKER; + }else if (trackMessages) { + messageCache.put(send.getMessageId(), send.copy()); } - return TRACKED_RESPONSE_MARKER; } return null; } @@ -483,4 +513,20 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { this.restoreTransaction = restoreTransaction; } + public boolean isTrackMessages() { + return trackMessages; + } + + public void setTrackMessages(boolean trackMessages) { + this.trackMessages = trackMessages; + } + + public int getMaxCacheSize() { + return maxCacheSize; + } + + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java index 0e4ac667f7..c9795965ff 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java @@ -93,7 +93,7 @@ public class ResponseCorrelator extends TransportFilter { future.set(response); } else { if (debug) { - LOG.debug("Received unexpected response for command id: " + response.getCorrelationId()); + LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId()); } } } else { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index ab982a3f62..c6cc9d2ef0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadFactory; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.state.ConnectionStateTracker; @@ -92,6 +93,8 @@ public class FailoverTransport implements CompositeTransport { private boolean backup=false; private List backups=new CopyOnWriteArrayList(); private int backupPoolSize=1; + private boolean trackMessages = true; + private int maxCacheSize = 128 * 1024; private final TransportListener myTransportListener = createTransportListener(); @@ -223,6 +226,8 @@ public class FailoverTransport implements CompositeTransport { return; } started = true; + stateTracker.setMaxCacheSize(getMaxCacheSize()); + stateTracker.setTrackMessages(isTrackMessages()); if (connectedTransport != null) { stateTracker.restore(connectedTransport); } else { @@ -336,6 +341,22 @@ public class FailoverTransport implements CompositeTransport { this.backupPoolSize = backupPoolSize; } + public boolean isTrackMessages() { + return trackMessages; + } + + public void setTrackMessages(boolean trackMessages) { + this.trackMessages = trackMessages; + } + + public int getMaxCacheSize() { + return maxCacheSize; + } + + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + /** * @return Returns true if the command is one sent when a connection * is being closed. @@ -407,6 +428,7 @@ public class FailoverTransport implements CompositeTransport { // Send the message. try { connectedTransport.oneway(command); + stateTracker.trackBack(command); } catch (IOException e) { // If the command was not tracked.. we will retry in @@ -548,6 +570,10 @@ public class FailoverTransport implements CompositeTransport { protected void restoreTransport(Transport t) throws Exception, IOException { t.start(); + //send information to the broker - informing it we are an ft client + ConnectionControl cc = new ConnectionControl(); + cc.setFaultTolerant(true); + t.oneway(cc); stateTracker.restore(t); for (Iterator iter2 = requestMap.values().iterator(); iter2.hasNext();) { Command command = iter2.next(); @@ -753,5 +779,4 @@ public class FailoverTransport implements CompositeTransport { public void reconnect(URI uri) throws IOException { add(new URI[] {uri}); } - }