mirror of https://github.com/apache/activemq.git
Deprecate the copyMessage option and remove usage, always copy a forwarded message.
This commit is contained in:
parent
1ccd17791b
commit
5096463b00
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
* this work for additional information regarding copyright ownership.
|
* this work for additional information regarding copyright ownership.
|
||||||
|
@ -24,20 +24,16 @@ import org.apache.activemq.broker.region.Destination;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.CommandTypes;
|
import org.apache.activemq.command.CommandTypes;
|
||||||
|
|
||||||
/**
|
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public abstract class CompositeDestination implements VirtualDestination {
|
public abstract class CompositeDestination implements VirtualDestination {
|
||||||
|
|
||||||
private String name;
|
private String name;
|
||||||
private Collection forwardTo;
|
private Collection forwardTo;
|
||||||
private boolean forwardOnly = true;
|
private boolean forwardOnly = true;
|
||||||
private boolean copyMessage = true;
|
|
||||||
private boolean concurrentSend = false;
|
private boolean concurrentSend = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Destination intercept(Destination destination) {
|
public Destination intercept(Destination destination) {
|
||||||
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage(), isConcurrentSend());
|
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isConcurrentSend());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -83,17 +79,20 @@ public abstract class CompositeDestination implements VirtualDestination {
|
||||||
this.forwardOnly = forwardOnly;
|
this.forwardOnly = forwardOnly;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
public boolean isCopyMessage() {
|
public boolean isCopyMessage() {
|
||||||
return copyMessage;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets whether a copy of the message will be sent to each destination.
|
* Sets whether a copy of the message will be sent to each destination.
|
||||||
* Defaults to true so that the forward destination is set as the
|
* Defaults to true so that the forward destination is set as the
|
||||||
* destination of the message
|
* destination of the message
|
||||||
|
*
|
||||||
|
* @deprecated this option will be removed in a later release, message are always copied.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public void setCopyMessage(boolean copyMessage) {
|
public void setCopyMessage(boolean copyMessage) {
|
||||||
this.copyMessage = copyMessage;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -109,7 +108,6 @@ public abstract class CompositeDestination implements VirtualDestination {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActiveMQDestination getMappedDestinations() {
|
public ActiveMQDestination getMappedDestinations() {
|
||||||
|
|
||||||
final ActiveMQDestination[] destinations = new ActiveMQDestination[forwardTo.size()];
|
final ActiveMQDestination[] destinations = new ActiveMQDestination[forwardTo.size()];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (Object dest : forwardTo) {
|
for (Object dest : forwardTo) {
|
||||||
|
@ -148,39 +146,50 @@ public abstract class CompositeDestination implements VirtualDestination {
|
||||||
final int prime = 31;
|
final int prime = 31;
|
||||||
int result = 1;
|
int result = 1;
|
||||||
result = prime * result + (concurrentSend ? 1231 : 1237);
|
result = prime * result + (concurrentSend ? 1231 : 1237);
|
||||||
result = prime * result + (copyMessage ? 1231 : 1237);
|
|
||||||
result = prime * result + (forwardOnly ? 1231 : 1237);
|
result = prime * result + (forwardOnly ? 1231 : 1237);
|
||||||
result = prime * result
|
result = prime * result + ((forwardTo == null) ? 0 : forwardTo.hashCode());
|
||||||
+ ((forwardTo == null) ? 0 : forwardTo.hashCode());
|
|
||||||
result = prime * result + ((name == null) ? 0 : name.hashCode());
|
result = prime * result + ((name == null) ? 0 : name.hashCode());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object obj) {
|
public boolean equals(Object obj) {
|
||||||
if (this == obj)
|
if (this == obj) {
|
||||||
return true;
|
return true;
|
||||||
if (obj == null)
|
}
|
||||||
|
|
||||||
|
if (obj == null) {
|
||||||
return false;
|
return false;
|
||||||
if (getClass() != obj.getClass())
|
}
|
||||||
|
|
||||||
|
if (getClass() != obj.getClass()) {
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
CompositeDestination other = (CompositeDestination) obj;
|
CompositeDestination other = (CompositeDestination) obj;
|
||||||
if (concurrentSend != other.concurrentSend)
|
if (concurrentSend != other.concurrentSend) {
|
||||||
return false;
|
return false;
|
||||||
if (copyMessage != other.copyMessage)
|
}
|
||||||
return false;
|
|
||||||
if (forwardOnly != other.forwardOnly)
|
if (forwardOnly != other.forwardOnly) {
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (forwardTo == null) {
|
if (forwardTo == null) {
|
||||||
if (other.forwardTo != null)
|
if (other.forwardTo != null) {
|
||||||
return false;
|
return false;
|
||||||
} else if (!forwardTo.equals(other.forwardTo))
|
}
|
||||||
|
} else if (!forwardTo.equals(other.forwardTo)) {
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (name == null) {
|
if (name == null) {
|
||||||
if (other.name != null)
|
if (other.name != null)
|
||||||
return false;
|
return false;
|
||||||
} else if (!name.equals(other.name))
|
} else if (!name.equals(other.name)) {
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
* this work for additional information regarding copyright ownership.
|
* this work for additional information regarding copyright ownership.
|
||||||
|
@ -35,24 +35,21 @@ import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
|
||||||
/**
|
/**
|
||||||
* Represents a composite {@link Destination} where send()s are replicated to
|
* Represents a composite {@link Destination} where send()s are replicated to
|
||||||
* each Destination instance.
|
* each Destination instance.
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public class CompositeDestinationFilter extends DestinationFilter {
|
public class CompositeDestinationFilter extends DestinationFilter {
|
||||||
|
|
||||||
private Collection forwardDestinations;
|
private Collection forwardDestinations;
|
||||||
private boolean forwardOnly;
|
private boolean forwardOnly;
|
||||||
private boolean copyMessage;
|
|
||||||
private boolean concurrentSend = false;
|
private boolean concurrentSend = false;
|
||||||
|
|
||||||
public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage, boolean concurrentSend) {
|
public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean concurrentSend) {
|
||||||
super(next);
|
super(next);
|
||||||
this.forwardDestinations = forwardDestinations;
|
this.forwardDestinations = forwardDestinations;
|
||||||
this.forwardOnly = forwardOnly;
|
this.forwardOnly = forwardOnly;
|
||||||
this.copyMessage = copyMessage;
|
|
||||||
this.concurrentSend = concurrentSend;
|
this.concurrentSend = concurrentSend;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void send(final ProducerBrokerExchange context, final Message message) throws Exception {
|
public void send(final ProducerBrokerExchange context, final Message message) throws Exception {
|
||||||
MessageEvaluationContext messageContext = null;
|
MessageEvaluationContext messageContext = null;
|
||||||
|
|
||||||
|
@ -113,19 +110,13 @@ public class CompositeDestinationFilter extends DestinationFilter {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doForward(ProducerBrokerExchange context, Message message, Broker regionBroker, ActiveMQDestination destination) throws Exception {
|
private void doForward(ProducerBrokerExchange context, Message message, Broker regionBroker, ActiveMQDestination destination) throws Exception {
|
||||||
Message forwarded_message;
|
Message forwardedMessage = message.copy();
|
||||||
|
|
||||||
if (copyMessage) {
|
forwardedMessage.setOriginalDestination( message.getDestination() );
|
||||||
forwarded_message = message.copy();
|
forwardedMessage.setDestination(destination);
|
||||||
forwarded_message.setOriginalDestination( message.getDestination() );
|
|
||||||
forwarded_message.setDestination(destination);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
forwarded_message = message;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send it back through the region broker for routing.
|
// Send it back through the region broker for routing.
|
||||||
context.setMutable(true);
|
context.setMutable(true);
|
||||||
regionBroker.send(context, forwarded_message);
|
regionBroker.send(context, forwardedMessage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue