Add MessageId and original destination info to the advisory message
properties.
This commit is contained in:
Timothy Bish 2014-06-04 13:30:42 -04:00
parent be0311bea0
commit 6c703ac6ee
3 changed files with 22 additions and 3 deletions

View File

@ -349,7 +349,13 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
fireAdvisory(context, topic, payload);
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
ActiveMQDestination destination = payload.getDestination();
if (destination != null) {
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, payload.getMessageId().toString());
}
fireAdvisory(context, topic, payload, null, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("consumed", e);
@ -364,7 +370,13 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
fireAdvisory(context, topic, payload);
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
ActiveMQDestination destination = payload.getDestination();
if (destination != null) {
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, payload.getMessageId().toString());
}
fireAdvisory(context, topic, payload, null, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("delivered", e);
@ -383,7 +395,12 @@ public class AdvisoryBroker extends BrokerFilter {
if (sub instanceof TopicSubscription) {
advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded());
}
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
ActiveMQDestination destination = payload.getDestination();
if (destination != null) {
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION, payload.getMessageId().toString());
}
fireAdvisory(context, topic, payload, null, advisoryMessage);
}
} catch (Exception e) {

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
@ -61,6 +62,7 @@ public final class AdvisorySupport {
public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
public static final String MSG_PROPERTY_DESTINATION = "orignalDestination";
public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";

View File

@ -158,7 +158,7 @@ public class AdvisoryTempDestinationTests extends TestCase {
producer.send(m);
}
Message msg = advisoryConsumer.receive(2000);
Message msg = advisoryConsumer.receive(5000);
assertNotNull(msg);
}