mirror of
https://github.com/apache/activemq.git
synced 2025-02-16 23:16:52 +00:00
applied patch for AMQ-1440 and AMQ-1439
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@586251 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fe8bc3317c
commit
1ea24bff37
@ -301,6 +301,14 @@ public class FanoutTransport implements CompositeTransport {
|
||||
reconnectTask.shutdown();
|
||||
}
|
||||
|
||||
public int getMinAckCount() {
|
||||
return minAckCount;
|
||||
}
|
||||
|
||||
public void setMinAckCount(int minAckCount) {
|
||||
this.minAckCount = minAckCount;
|
||||
}
|
||||
|
||||
public long getInitialReconnectDelay() {
|
||||
return initialReconnectDelay;
|
||||
}
|
||||
@ -338,24 +346,14 @@ public class FanoutTransport implements CompositeTransport {
|
||||
try {
|
||||
synchronized (reconnectMutex) {
|
||||
|
||||
// If it was a request and it was not being tracked by
|
||||
// the state tracker,
|
||||
// then hold it in the requestMap so that we can replay
|
||||
// it later.
|
||||
boolean fanout = isFanoutCommand(command);
|
||||
if (stateTracker.track(command) == null && command.isResponseRequired()) {
|
||||
int size = fanout ? minAckCount : 1;
|
||||
requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
|
||||
}
|
||||
|
||||
// Wait for transport to be connected.
|
||||
while (connectedCount != minAckCount && !disposed && connectionFailure == null) {
|
||||
while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
|
||||
LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
|
||||
reconnectMutex.wait(1000);
|
||||
}
|
||||
|
||||
// Still not fully connected.
|
||||
if (connectedCount != minAckCount) {
|
||||
if (connectedCount < minAckCount) {
|
||||
|
||||
Exception error;
|
||||
|
||||
@ -374,6 +372,16 @@ public class FanoutTransport implements CompositeTransport {
|
||||
throw IOExceptionSupport.create(error);
|
||||
}
|
||||
|
||||
// If it was a request and it was not being tracked by
|
||||
// the state tracker,
|
||||
// then hold it in the requestMap so that we can replay
|
||||
// it later.
|
||||
boolean fanout = isFanoutCommand(command);
|
||||
if (stateTracker.track(command) == null && command.isResponseRequired()) {
|
||||
int size = fanout ? minAckCount : 1;
|
||||
requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
|
||||
}
|
||||
|
||||
// Send the message.
|
||||
if (fanout) {
|
||||
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
|
||||
@ -543,4 +551,5 @@ public class FanoutTransport implements CompositeTransport {
|
||||
public boolean isFaultTolerant() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user