git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@646401 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-09 15:27:42 +00:00
parent 50bc597fa8
commit 6b6cdaee34
1 changed files with 18 additions and 8 deletions

View File

@ -22,9 +22,10 @@ import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
@ -69,7 +70,7 @@ public class FailoverTransport implements CompositeTransport {
private final Object backupMutex = new Object();
private final Object sleepMutex = new Object();
private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
private final ConcurrentHashMap<Integer, Command> requestMap = new ConcurrentHashMap<Integer, Command>();
private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
private URI connectedTransportURI;
private URI failedConnectTransportURI;
@ -139,7 +140,10 @@ public class FailoverTransport implements CompositeTransport {
return;
}
if (command.isResponse()) {
Object object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
Object object = null;
synchronized(requestMap) {
object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
}
if (object != null && object.getClass() == Tracked.class) {
((Tracked)object).onResponses();
}
@ -426,10 +430,12 @@ public class FailoverTransport implements CompositeTransport {
// then hold it in the requestMap so that we can replay
// it later.
Tracked tracked = stateTracker.track(command);
if (tracked != null && tracked.isWaitingForResponse()) {
requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
} else if (tracked == null && command.isResponseRequired()) {
requestMap.put(Integer.valueOf(command.getCommandId()), command);
synchronized(requestMap) {
if (tracked != null && tracked.isWaitingForResponse()) {
requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
} else if (tracked == null && command.isResponseRequired()) {
requestMap.put(Integer.valueOf(command.getCommandId()), command);
}
}
// Send the message.
@ -581,7 +587,11 @@ public class FailoverTransport implements CompositeTransport {
cc.setFaultTolerant(true);
t.oneway(cc);
stateTracker.restore(t);
for (Iterator<Command> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
Map tmpMap = null;
synchronized(requestMap) {
tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
}
for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
Command command = iter2.next();
t.oneway(command);
}