mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3331 - make bridge alwaysSyncSend=true the default
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1441545 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6028ec4d35
commit
f383ca1de4
|
@ -965,25 +965,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
+ message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
|
||||
}
|
||||
|
||||
if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {
|
||||
|
||||
// If the message was originally sent using async send, we will
|
||||
// preserve that QOS by bridging it using an async send (small chance
|
||||
// of message loss).
|
||||
try {
|
||||
remoteBroker.oneway(message);
|
||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||
dequeueCounter.incrementAndGet();
|
||||
} finally {
|
||||
sub.decrementOutstandingResponses();
|
||||
}
|
||||
|
||||
} else {
|
||||
if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
|
||||
|
||||
// The message was not sent using async send, so we should only
|
||||
// ack the local broker when we get confirmation that the remote
|
||||
// broker has received the message.
|
||||
ResponseCallback callback = new ResponseCallback() {
|
||||
remoteBroker.asyncRequest(message, new ResponseCallback() {
|
||||
@Override
|
||||
public void onCompletion(FutureResponse future) {
|
||||
try {
|
||||
|
@ -1001,9 +988,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
sub.decrementOutstandingResponses();
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
remoteBroker.asyncRequest(message, callback);
|
||||
} else {
|
||||
// If the message was originally sent using async send, we will
|
||||
// preserve that QOS by bridging it using an async send (small chance
|
||||
// of message loss).
|
||||
try {
|
||||
remoteBroker.oneway(message);
|
||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||
dequeueCounter.incrementAndGet();
|
||||
} finally {
|
||||
sub.decrementOutstandingResponses();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -54,7 +54,7 @@ public class NetworkBridgeConfiguration {
|
|||
private boolean suppressDuplicateQueueSubscriptions = false;
|
||||
private boolean suppressDuplicateTopicSubscriptions = true;
|
||||
|
||||
private boolean alwaysSyncSend = false;
|
||||
private boolean alwaysSyncSend = true;
|
||||
private boolean staticBridge = false;
|
||||
private boolean useCompression = false;
|
||||
private boolean advisoryForFailedForward = false;
|
||||
|
|
Loading…
Reference in New Issue