mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@650766 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
18f9773326
commit
620b657216
|
@ -152,6 +152,7 @@ public class BrokerService implements Service {
|
|||
private boolean keepDurableSubsActive = true;
|
||||
private boolean useVirtualTopics = true;
|
||||
private boolean useMirroredQueues = false;
|
||||
private boolean useTempMirroredQueues=true;
|
||||
private BrokerId brokerId;
|
||||
private DestinationInterceptor[] destinationInterceptors;
|
||||
private ActiveMQDestination[] destinations;
|
||||
|
@ -1303,6 +1304,14 @@ public class BrokerService implements Service {
|
|||
int timeBeforePurgeTempDestinations) {
|
||||
this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
|
||||
}
|
||||
|
||||
public boolean isUseTempMirroredQueues() {
|
||||
return useTempMirroredQueues;
|
||||
}
|
||||
|
||||
public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
|
||||
this.useTempMirroredQueues = useTempMirroredQueues;
|
||||
}
|
||||
//
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
|
@ -1902,4 +1911,5 @@ public class BrokerService implements Service {
|
|||
public void setRegionBroker(Broker regionBroker) {
|
||||
this.regionBroker = regionBroker;
|
||||
}
|
||||
|
||||
}
|
|
@ -44,25 +44,27 @@ public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware
|
|||
|
||||
public Destination intercept(final Destination destination) {
|
||||
if (destination.getActiveMQDestination().isQueue()) {
|
||||
try {
|
||||
final Destination mirrorDestination = getMirrorDestination(destination);
|
||||
if (mirrorDestination != null) {
|
||||
return new DestinationFilter(destination) {
|
||||
public void send(ProducerBrokerExchange context, Message message) throws Exception {
|
||||
message.setDestination(mirrorDestination.getActiveMQDestination());
|
||||
mirrorDestination.send(context, message);
|
||||
|
||||
if (isCopyMessage()) {
|
||||
message = message.copy();
|
||||
if (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues()) {
|
||||
try {
|
||||
final Destination mirrorDestination = getMirrorDestination(destination);
|
||||
if (mirrorDestination != null) {
|
||||
return new DestinationFilter(destination) {
|
||||
public void send(ProducerBrokerExchange context, Message message) throws Exception {
|
||||
message.setDestination(mirrorDestination.getActiveMQDestination());
|
||||
mirrorDestination.send(context, message);
|
||||
|
||||
if (isCopyMessage()) {
|
||||
message = message.copy();
|
||||
}
|
||||
message.setDestination(destination.getActiveMQDestination());
|
||||
super.send(context, message);
|
||||
}
|
||||
message.setDestination(destination.getActiveMQDestination());
|
||||
super.send(context, message);
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error("Failed to lookup the mirror destination for: " + destination + ". Reason: " + e, e);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error("Failed to lookup the mirror destination for: " + destination + ". Reason: " + e, e);
|
||||
}
|
||||
}
|
||||
return destination;
|
||||
|
|
Loading…
Reference in New Issue