mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4539 - add JMSActiveMQBrokerPath string property and brokerPath attribute via jmx message view, the list length is the current number of network hops
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1483600 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5bb563dd1c
commit
6f124c16f7
|
@ -24,6 +24,7 @@ public interface CompositeDataConstants {
|
|||
String PROPERTIES = "PropertiesText";
|
||||
String JMSXGROUP_SEQ = "JMSXGroupSeq";
|
||||
String JMSXGROUP_ID = "JMSXGroupID";
|
||||
String BROKER_PATH = "BrokerPath";
|
||||
String BODY_LENGTH = "BodyLength";
|
||||
String BODY_PREVIEW = "BodyPreview";
|
||||
String CONTENT_MAP = "ContentMap";
|
||||
|
|
|
@ -40,6 +40,7 @@ import javax.management.openmbean.TabularDataSupport;
|
|||
import javax.management.openmbean.TabularType;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -129,6 +130,7 @@ public final class OpenTypeSupport {
|
|||
addItem("JMSTimestamp", "JMSTimestamp", SimpleType.DATE);
|
||||
addItem(CompositeDataConstants.JMSXGROUP_ID, "Message Group ID", SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.JMSXGROUP_SEQ, "Message Group Sequence Number", SimpleType.INTEGER);
|
||||
addItem(CompositeDataConstants.BROKER_PATH, "Brokers traversed", SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.ORIGINAL_DESTINATION, "Original Destination Before Senting To DLQ", SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.PROPERTIES, "User Properties Text", SimpleType.STRING);
|
||||
|
||||
|
@ -168,6 +170,7 @@ public final class OpenTypeSupport {
|
|||
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
|
||||
rc.put(CompositeDataConstants.JMSXGROUP_ID, m.getGroupID());
|
||||
rc.put(CompositeDataConstants.JMSXGROUP_SEQ, m.getGroupSequence());
|
||||
rc.put(CompositeDataConstants.BROKER_PATH, Arrays.toString(m.getBrokerPath()));
|
||||
rc.put(CompositeDataConstants.ORIGINAL_DESTINATION, toString(m.getOriginalDestination()));
|
||||
try {
|
||||
rc.put(CompositeDataConstants.PROPERTIES, "" + m.getProperties());
|
||||
|
|
|
@ -46,6 +46,8 @@ import org.apache.activemq.util.TypeConversionSupport;
|
|||
public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage {
|
||||
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
|
||||
public static final String DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
|
||||
public static final String BROKER_PATH_PROPERTY = "JMSActiveMQBrokerPath";
|
||||
|
||||
private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
|
||||
|
||||
protected transient Callback acknowledgeCallback;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.activemq.filter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -159,6 +160,12 @@ public class PropertyExpression implements Expression {
|
|||
return Long.valueOf(message.getBrokerOutTime());
|
||||
}
|
||||
});
|
||||
JMS_PROPERTY_EXPRESSIONS.put("JMSActiveMQBrokerPath", new SubExpression() {
|
||||
|
||||
public Object evaluate(Message message) {
|
||||
return Arrays.toString(message.getBrokerPath());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private final String name;
|
||||
|
|
|
@ -41,6 +41,7 @@ import javax.jms.QueueBrowser;
|
|||
import javax.jms.Session;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerTestSupport;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
|
|||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
|
@ -223,6 +225,8 @@ public class BrokerNetworkWithStuckMessagesTest {
|
|||
assertNotNull(message1);
|
||||
LOG.info("on remote, got: " + message1.getMessageId());
|
||||
connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
|
||||
assertTrue("JMSActiveMQBrokerPath property present and correct",
|
||||
((ActiveMQMessage)message1).getStringProperty(ActiveMQMessage.BROKER_PATH_PROPERTY).contains(localBroker.getBroker().getBrokerId().toString()));
|
||||
}
|
||||
|
||||
// Ensure that there are zero messages on the local broker. This tells
|
||||
|
@ -273,7 +277,10 @@ public class BrokerNetworkWithStuckMessagesTest {
|
|||
messages = browseQueueWithJmx(remoteBroker);
|
||||
assertEquals(5, messages.length);
|
||||
|
||||
LOG.info("Messages now stuck on remote");
|
||||
assertTrue("can see broker path property",
|
||||
((String)((CompositeData)messages[1]).get("BrokerPath")).contains(localBroker.getBroker().getBrokerId().toString()));
|
||||
|
||||
LOG.info("Messages now stuck on remote");
|
||||
|
||||
// receive again on the origin broker
|
||||
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationInfo1);
|
||||
|
|
Loading…
Reference in New Issue