mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6070 - rework for virtual topic case, use the destination from the transient region destination rather than the message, such that consumer queue advisories work for delivered etc
(cherry picked from commit 179dc3acb2
)
This commit is contained in:
parent
1ebfa9ade2
commit
558dcc0479
|
@ -426,9 +426,10 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
super.messageExpired(context, messageReference, subscription);
|
super.messageExpired(context, messageReference, subscription);
|
||||||
try {
|
try {
|
||||||
if (!messageReference.isAdvisory()) {
|
if (!messageReference.isAdvisory()) {
|
||||||
ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
|
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
|
||||||
|
ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination());
|
||||||
Message payload = messageReference.getMessage().copy();
|
Message payload = messageReference.getMessage().copy();
|
||||||
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
|
if (!baseDestination.isIncludeBodyForAdvisory()) {
|
||||||
payload.clearBody();
|
payload.clearBody();
|
||||||
}
|
}
|
||||||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||||
|
@ -445,17 +446,15 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
super.messageConsumed(context, messageReference);
|
super.messageConsumed(context, messageReference);
|
||||||
try {
|
try {
|
||||||
if (!messageReference.isAdvisory()) {
|
if (!messageReference.isAdvisory()) {
|
||||||
ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
|
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
|
||||||
|
ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(baseDestination.getActiveMQDestination());
|
||||||
Message payload = messageReference.getMessage().copy();
|
Message payload = messageReference.getMessage().copy();
|
||||||
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
|
if (!baseDestination.isIncludeBodyForAdvisory()) {
|
||||||
payload.clearBody();
|
payload.clearBody();
|
||||||
}
|
}
|
||||||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
|
||||||
ActiveMQDestination destination = payload.getDestination();
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
|
||||||
if (destination != null) {
|
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, destination.getQualifiedName());
|
|
||||||
}
|
|
||||||
fireAdvisory(context, topic, payload, null, advisoryMessage);
|
fireAdvisory(context, topic, payload, null, advisoryMessage);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -468,17 +467,15 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
super.messageDelivered(context, messageReference);
|
super.messageDelivered(context, messageReference);
|
||||||
try {
|
try {
|
||||||
if (!messageReference.isAdvisory()) {
|
if (!messageReference.isAdvisory()) {
|
||||||
ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
|
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
|
||||||
|
ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(baseDestination.getActiveMQDestination());
|
||||||
Message payload = messageReference.getMessage().copy();
|
Message payload = messageReference.getMessage().copy();
|
||||||
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
|
if (!baseDestination.isIncludeBodyForAdvisory()) {
|
||||||
payload.clearBody();
|
payload.clearBody();
|
||||||
}
|
}
|
||||||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
|
||||||
ActiveMQDestination destination = payload.getDestination();
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
|
||||||
if (destination != null) {
|
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, destination.getQualifiedName());
|
|
||||||
}
|
|
||||||
fireAdvisory(context, topic, payload, null, advisoryMessage);
|
fireAdvisory(context, topic, payload, null, advisoryMessage);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -491,9 +488,10 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
super.messageDiscarded(context, sub, messageReference);
|
super.messageDiscarded(context, sub, messageReference);
|
||||||
try {
|
try {
|
||||||
if (!messageReference.isAdvisory()) {
|
if (!messageReference.isAdvisory()) {
|
||||||
ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
|
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
|
||||||
|
ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(baseDestination.getActiveMQDestination());
|
||||||
Message payload = messageReference.getMessage().copy();
|
Message payload = messageReference.getMessage().copy();
|
||||||
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
|
if (!baseDestination.isIncludeBodyForAdvisory()) {
|
||||||
payload.clearBody();
|
payload.clearBody();
|
||||||
}
|
}
|
||||||
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
|
||||||
|
@ -502,10 +500,8 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
}
|
}
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
|
||||||
ActiveMQDestination destination = payload.getDestination();
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, baseDestination.getActiveMQDestination().getQualifiedName());
|
||||||
if (destination != null) {
|
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, destination.getQualifiedName());
|
|
||||||
}
|
|
||||||
fireAdvisory(context, topic, payload, null, advisoryMessage);
|
fireAdvisory(context, topic, payload, null, advisoryMessage);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -716,9 +712,10 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
if (wasDLQd) {
|
if (wasDLQd) {
|
||||||
try {
|
try {
|
||||||
if (!messageReference.isAdvisory()) {
|
if (!messageReference.isAdvisory()) {
|
||||||
ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
|
BaseDestination baseDestination = (BaseDestination) messageReference.getMessage().getRegionDestination();
|
||||||
|
ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(baseDestination.getActiveMQDestination());
|
||||||
Message payload = messageReference.getMessage().copy();
|
Message payload = messageReference.getMessage().copy();
|
||||||
if (!isIncludeBodyForAdvisory(messageReference.getMessage().getDestination())) {
|
if (!baseDestination.isIncludeBodyForAdvisory()) {
|
||||||
payload.clearBody();
|
payload.clearBody();
|
||||||
}
|
}
|
||||||
fireAdvisory(context, topic, payload);
|
fireAdvisory(context, topic, payload);
|
||||||
|
@ -773,12 +770,6 @@ public class AdvisoryBroker extends BrokerFilter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean isIncludeBodyForAdvisory(ActiveMQDestination activemqDestination) {
|
|
||||||
Destination destination = next.getDestinationMap(activemqDestination).get(activemqDestination);
|
|
||||||
return (destination instanceof BaseDestination &&
|
|
||||||
((BaseDestination) destination).isIncludeBodyForAdvisory()) ? true : false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleFireFailure(String message, Throwable cause) {
|
private void handleFireFailure(String message, Throwable cause) {
|
||||||
LOG.warn("Failed to fire {} advisory, reason: {}", message, cause);
|
LOG.warn("Failed to fire {} advisory, reason: {}", message, cause);
|
||||||
LOG.debug("{} detail: {}", message, cause);
|
LOG.debug("{} detail: {}", message, cause);
|
||||||
|
|
|
@ -356,6 +356,8 @@ public class Topic extends BaseDestination implements Task {
|
||||||
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
|
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
|
||||||
&& !context.isInRecoveryMode();
|
&& !context.isInRecoveryMode();
|
||||||
|
|
||||||
|
message.setRegionDestination(this);
|
||||||
|
|
||||||
// There is delay between the client sending it and it arriving at the
|
// There is delay between the client sending it and it arriving at the
|
||||||
// destination.. it may have expired.
|
// destination.. it may have expired.
|
||||||
if (message.isExpired()) {
|
if (message.isExpired()) {
|
||||||
|
@ -494,7 +496,6 @@ public class Topic extends BaseDestination implements Task {
|
||||||
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
|
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
|
||||||
throws IOException, Exception {
|
throws IOException, Exception {
|
||||||
final ConnectionContext context = producerExchange.getConnectionContext();
|
final ConnectionContext context = producerExchange.getConnectionContext();
|
||||||
message.setRegionDestination(this);
|
|
||||||
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
|
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
|
||||||
Future<Object> result = null;
|
Future<Object> result = null;
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
@ -44,9 +45,10 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
@ -67,7 +69,7 @@ public class AdvisoryTests {
|
||||||
protected final int EXPIRE_MESSAGE_PERIOD = 10000;
|
protected final int EXPIRE_MESSAGE_PERIOD = 10000;
|
||||||
|
|
||||||
|
|
||||||
@Parameters
|
@Parameters(name = "includeBodyForAdvisory={0}")
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
return Arrays.asList(new Object[][] {
|
return Arrays.asList(new Object[][] {
|
||||||
// Include the full body of the message
|
// Include the full body of the message
|
||||||
|
@ -293,6 +295,47 @@ public class AdvisoryTests {
|
||||||
assertIncludeBodyForAdvisory(payload);
|
assertIncludeBodyForAdvisory(payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMessageDeliveryVTAdvisory() throws Exception {
|
||||||
|
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQTopic vt = new ActiveMQTopic("VirtualTopic.TEST");
|
||||||
|
|
||||||
|
ActiveMQQueue a = new ActiveMQQueue("Consumer.A.VirtualTopic.TEST");
|
||||||
|
MessageConsumer consumer = s.createConsumer(a);
|
||||||
|
|
||||||
|
ActiveMQQueue b = new ActiveMQQueue("Consumer.B.VirtualTopic.TEST");
|
||||||
|
MessageConsumer consumerB = s.createConsumer(b);
|
||||||
|
|
||||||
|
assertNotNull(consumer);
|
||||||
|
assertNotNull(consumerB);
|
||||||
|
|
||||||
|
HashSet<String> dests = new HashSet<String>();
|
||||||
|
dests.add(vt.getQualifiedName());
|
||||||
|
dests.add(a.getQualifiedName());
|
||||||
|
dests.add(b.getQualifiedName());
|
||||||
|
|
||||||
|
|
||||||
|
Topic advisoryTopic = new ActiveMQTopic(AdvisorySupport.MESSAGE_DELIVERED_TOPIC_PREFIX + ">");
|
||||||
|
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
|
||||||
|
|
||||||
|
// throw messages at the vt
|
||||||
|
MessageProducer producer = s.createProducer(vt);
|
||||||
|
|
||||||
|
BytesMessage m = s.createBytesMessage();
|
||||||
|
m.writeBytes(new byte[1024]);
|
||||||
|
producer.send(m);
|
||||||
|
|
||||||
|
Message msg = null;
|
||||||
|
while ((msg = advisoryConsumer.receive(1000)) != null) {
|
||||||
|
ActiveMQMessage message = (ActiveMQMessage) msg;
|
||||||
|
String dest = (String) message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION);
|
||||||
|
dests.remove(dest);
|
||||||
|
assertIncludeBodyForAdvisory((ActiveMQMessage) message.getDataStructure());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Got delivered for all: " + dests, dests.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
if (broker == null) {
|
if (broker == null) {
|
||||||
|
|
Loading…
Reference in New Issue