From 3dd6575ac89f2a8416ddd419cc878bf4a20d6b04 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Thu, 21 Aug 2008 20:57:56 +0000 Subject: [PATCH] added a fix for AMQ-1904 to expose the user properties as TabularData so JMX tools can browse all of the information in a message git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@687861 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/jmx/CompositeDataConstants.java | 41 ++++++ .../broker/jmx/CompositeDataHelper.java | 49 +++++++ .../activemq/broker/jmx/DestinationView.java | 4 + .../activemq/broker/jmx/OpenTypeSupport.java | 138 ++++++++++++++++-- .../apache/activemq/broker/jmx/MBeanTest.java | 79 ++++++++++ 5 files changed, 296 insertions(+), 15 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java new file mode 100644 index 0000000000..0c8694feba --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataConstants.java @@ -0,0 +1,41 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +/** + * @version $Revision: 1.1 $ + */ +public interface CompositeDataConstants { + String PROPERTIES = "PropertiesText"; + String JMSXGROUP_SEQ = "JMSXGroupSeq"; + String JMSXGROUP_ID = "JMSXGroupID"; + String BODY_LENGTH = "BodyLength"; + String BODY_PREVIEW = "BodyPreview"; + String CONTENT_MAP = "ContentMap"; + String MESSAGE_TEXT = "Text"; + + // User properties + String STRING_PROPERTIES = "StringProperties"; + String BOOLEAN_PROPERTIES = "BooleanProperties"; + String BYTE_PROPERTIES = "ByteProperties"; + String SHORT_PROPERTIES = "ShortProperties"; + String INT_PROPERTIES = "IntProperties"; + String LONG_PROPERTIES = "LongProperties"; + String FLOAT_PROPERTIES = "FloatProperties"; + String DOUBLE_PROPERTIES = "DoubleProperties"; +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java new file mode 100644 index 0000000000..33821b2c83 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/CompositeDataHelper.java @@ -0,0 +1,49 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.jmx; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * @version $Revision: 1.1 $ + */ +public class CompositeDataHelper { + + /** + * Extracts the named TabularData field from the CompositeData and converts it to a Map + * which is the method used to get the typesafe user properties. + */ + public static Map getTabularMap(CompositeData cdata, String fieldName) { + Map map = new HashMap(); + Object tabularObject = cdata.get(fieldName); + if (tabularObject instanceof TabularData) { + TabularData tabularData = (TabularData) tabularObject; + Collection values = tabularData.values(); + for (CompositeData compositeData : values) { + Object key = compositeData.get("key"); + Object value = compositeData.get("value"); + map.put(key, value); + } + } + return map; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 209169d87c..5b50f7592f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -156,6 +156,10 @@ public class DestinationView implements DestinationViewMBean { } } catch (Throwable e) { + // TODO DELETE ME + System.out.println(e); + e.printStackTrace(); + // TODO DELETE ME LOG.warn("exception browsing destination", e); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java index df1711219e..0a2a46f57f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.jms.DeliveryMode; import javax.jms.JMSException; @@ -32,6 +33,8 @@ import javax.management.openmbean.CompositeType; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.OpenType; import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularType; +import javax.management.openmbean.TabularDataSupport; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQMapMessage; @@ -40,6 +43,7 @@ import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQStreamMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.Message; +import static org.apache.activemq.broker.jmx.CompositeDataConstants.*; public final class OpenTypeSupport { @@ -95,6 +99,14 @@ public final class OpenTypeSupport { } static class MessageOpenTypeFactory extends AbstractOpenTypeFactory { + protected TabularType stringPropertyTabularType; + protected TabularType booleanPropertyTabularType; + protected TabularType bytePropertyTabularType; + protected TabularType shortPropertyTabularType; + protected TabularType intPropertyTabularType; + protected TabularType longPropertyTabularType; + protected TabularType floatPropertyTabularType; + protected TabularType doublePropertyTabularType; protected String getTypeName() { return ActiveMQMessage.class.getName(); @@ -112,7 +124,28 @@ public final class OpenTypeSupport { addItem("JMSPriority", "JMSPriority", SimpleType.INTEGER); addItem("JMSRedelivered", "JMSRedelivered", SimpleType.BOOLEAN); addItem("JMSTimestamp", "JMSTimestamp", SimpleType.DATE); - addItem("Properties", "Properties", SimpleType.STRING); + addItem(JMSXGROUP_ID, "Message Group ID", SimpleType.STRING); + addItem(JMSXGROUP_SEQ, "Message Group Sequence Number", SimpleType.INTEGER); + addItem(CompositeDataConstants.PROPERTIES, "User Properties Text", SimpleType.STRING); + + // now lets expose the type safe properties + stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING); + booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN); + bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE); + shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT); + intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER); + longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG); + floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT); + doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE); + + addItem(CompositeDataConstants.STRING_PROPERTIES, "User String Properties", stringPropertyTabularType); + addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, "User Boolean Properties", booleanPropertyTabularType); + addItem(CompositeDataConstants.BYTE_PROPERTIES, "User Byte Properties", bytePropertyTabularType); + addItem(CompositeDataConstants.SHORT_PROPERTIES, "User Short Properties", shortPropertyTabularType); + addItem(CompositeDataConstants.INT_PROPERTIES, "User Integer Properties", intPropertyTabularType); + addItem(CompositeDataConstants.LONG_PROPERTIES, "User Long Properties", longPropertyTabularType); + addItem(CompositeDataConstants.FLOAT_PROPERTIES, "User Float Properties", floatPropertyTabularType); + addItem(CompositeDataConstants.DOUBLE_PROPERTIES, "User Double Properties", doublePropertyTabularType); } public Map getFields(Object o) throws OpenDataException { @@ -128,25 +161,98 @@ public final class OpenTypeSupport { rc.put("JMSPriority", Integer.valueOf(m.getJMSPriority())); rc.put("JMSRedelivered", Boolean.valueOf(m.getJMSRedelivered())); rc.put("JMSTimestamp", new Date(m.getJMSTimestamp())); + rc.put(JMSXGROUP_ID, m.getGroupID()); + rc.put(JMSXGROUP_SEQ, m.getGroupSequence()); try { - rc.put("Properties", "" + m.getProperties()); + rc.put(CompositeDataConstants.PROPERTIES, "" + m.getProperties()); } catch (IOException e) { - rc.put("Properties", ""); + rc.put(CompositeDataConstants.PROPERTIES, ""); + } + + try { + rc.put(CompositeDataConstants.STRING_PROPERTIES, createTabularData(m, stringPropertyTabularType, String.class)); + } catch (IOException e) { + rc.put(CompositeDataConstants.STRING_PROPERTIES, new TabularDataSupport(stringPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, createTabularData(m, booleanPropertyTabularType, Boolean.class)); + } catch (IOException e) { + rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, new TabularDataSupport(booleanPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.BYTE_PROPERTIES, createTabularData(m, bytePropertyTabularType, Byte.class)); + } catch (IOException e) { + rc.put(CompositeDataConstants.BYTE_PROPERTIES, new TabularDataSupport(bytePropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.SHORT_PROPERTIES, createTabularData(m, shortPropertyTabularType, Short.class)); + } catch (IOException e) { + rc.put(CompositeDataConstants.SHORT_PROPERTIES, new TabularDataSupport(shortPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.INT_PROPERTIES, createTabularData(m, intPropertyTabularType, Integer.class)); + } catch (IOException e) { + rc.put(CompositeDataConstants.INT_PROPERTIES, new TabularDataSupport(intPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.LONG_PROPERTIES, createTabularData(m, longPropertyTabularType, Long.class)); + } catch (IOException e) { + rc.put(CompositeDataConstants.LONG_PROPERTIES, new TabularDataSupport(longPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.FLOAT_PROPERTIES, createTabularData(m, floatPropertyTabularType, Float.class)); + } catch (IOException e) { + rc.put(CompositeDataConstants.FLOAT_PROPERTIES, new TabularDataSupport(floatPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, createTabularData(m, doublePropertyTabularType, Double.class)); + } catch (IOException e) { + rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, new TabularDataSupport(doublePropertyTabularType)); } return rc; } + + + protected TabularType createTabularType(Class type, OpenType openType) throws OpenDataException { + String typeName = "java.util.Map"; + String[] keyValue = new String[]{"key", "value"}; + OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType}; + CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes); + return new TabularType(typeName, typeName, rowType, new String[]{"key"}); + } + + protected TabularDataSupport createTabularData(ActiveMQMessage m, TabularType type, Class valueType) throws IOException, OpenDataException { + TabularDataSupport answer = new TabularDataSupport(type); + Set> entries = m.getProperties().entrySet(); + for (Map.Entry entry : entries) { + Object value = entry.getValue(); + if (valueType.isInstance(value)) { + CompositeDataSupport compositeData = createTabularRowValue(type, entry.getKey(), value); + answer.put(compositeData); + } + } + return answer; + } + + protected CompositeDataSupport createTabularRowValue(TabularType type, String key, Object value) throws OpenDataException { + Map fields = new HashMap(); + fields.put("key", key); + fields.put("value", value); + return new CompositeDataSupport(type.getRowType(), fields); + } } static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory { + protected String getTypeName() { return ActiveMQBytesMessage.class.getName(); } protected void init() throws OpenDataException { super.init(); - addItem("BodyLength", "Body length", SimpleType.LONG); - addItem("BodyPreview", "Body preview", new ArrayType(1, SimpleType.BYTE)); + addItem(BODY_LENGTH, "Body length", SimpleType.LONG); + addItem(BODY_PREVIEW, "Body preview", new ArrayType(1, SimpleType.BYTE)); } public Map getFields(Object o) throws OpenDataException { @@ -155,9 +261,9 @@ public final class OpenTypeSupport { long length = 0; try { length = m.getBodyLength(); - rc.put("BodyLength", Long.valueOf(length)); + rc.put(BODY_LENGTH, Long.valueOf(length)); } catch (JMSException e) { - rc.put("BodyLength", Long.valueOf(0)); + rc.put(BODY_LENGTH, Long.valueOf(0)); } try { byte preview[] = new byte[(int)Math.min(length, 255)]; @@ -171,9 +277,9 @@ public final class OpenTypeSupport { data[i] = new Byte(preview[i]); } - rc.put("BodyPreview", data); + rc.put(BODY_PREVIEW, data); } catch (JMSException e) { - rc.put("BodyPreview", new byte[] {}); + rc.put(BODY_PREVIEW, new byte[] {}); } return rc; } @@ -181,22 +287,23 @@ public final class OpenTypeSupport { } static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory { + protected String getTypeName() { return ActiveMQMapMessage.class.getName(); } protected void init() throws OpenDataException { super.init(); - addItem("ContentMap", "Content map", SimpleType.STRING); + addItem(CONTENT_MAP, "Content map", SimpleType.STRING); } public Map getFields(Object o) throws OpenDataException { ActiveMQMapMessage m = (ActiveMQMapMessage)o; Map rc = super.getFields(o); try { - rc.put("ContentMap", "" + m.getContentMap()); + rc.put(CONTENT_MAP, "" + m.getContentMap()); } catch (JMSException e) { - rc.put("ContentMap", ""); + rc.put(CONTENT_MAP, ""); } return rc; } @@ -233,22 +340,23 @@ public final class OpenTypeSupport { } static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory { + protected String getTypeName() { return ActiveMQTextMessage.class.getName(); } protected void init() throws OpenDataException { super.init(); - addItem("Text", "Text", SimpleType.STRING); + addItem(MESSAGE_TEXT, MESSAGE_TEXT, SimpleType.STRING); } public Map getFields(Object o) throws OpenDataException { ActiveMQTextMessage m = (ActiveMQTextMessage)o; Map rc = super.getFields(o); try { - rc.put("Text", "" + m.getText()); + rc.put(MESSAGE_TEXT, "" + m.getText()); } catch (JMSException e) { - rc.put("Text", ""); + rc.put(MESSAGE_TEXT, ""); } return rc; } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index 1ea8ec1b23..c5198953ca 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.broker.jmx; import java.io.BufferedReader; import java.io.InputStreamReader; +import java.util.Map; +import java.util.HashMap; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageProducer; @@ -30,6 +32,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; import junit.textui.TestRunner; import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.BaseDestination; import org.apache.commons.logging.Log; @@ -72,6 +75,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { // test all the various MBeans now we have a producer, consumer and // messages on a queue + assertSendViaMBean(); assertQueueBrowseWorks(); assertCreateAndDestroyDurableSubscriptions(); assertConsumerCounts(); @@ -126,6 +130,77 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); } + + protected void assertSendViaMBean() throws Exception { + String queueName = getDestinationString() + ".SendMBBean"; + + ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); + echo("Create QueueView MBean..."); + BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + broker.addQueue(queueName); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + queueName + ",BrokerName=localhost"); + + echo("Create QueueView MBean..."); + QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + int count = 5; + for (int i = 0; i < count; i++) { + String body = "message:" + i; + + Map headers = new HashMap(); + headers.put("JMSCorrelationID", "MyCorrId"); + headers.put("JMSDeliveryMode", Boolean.TRUE); + headers.put("JMSXGroupID", "MyGroupID"); + headers.put("JMSXGroupSeq", 1234); + headers.put("JMSPriority", i); + headers.put("JMSType", "MyType"); + headers.put("MyHeader", i); + headers.put("MyStringHeader", "StringHeader" + i); + + proxy.sendTextMessage(headers, body); + } + + CompositeData[] compdatalist = proxy.browse(); + if (compdatalist.length == 0) { + fail("There is no message in the queue:"); + } + String[] messageIDs = new String[compdatalist.length]; + + for (int i = 0; i < compdatalist.length; i++) { + CompositeData cdata = compdatalist[i]; + + if (i == 0) { + echo("Columns: " + cdata.getCompositeType().keySet()); + } + + assertComplexData(cdata, "JMSCorrelationID", "MyCorrId"); + assertComplexData(cdata, "JMSPriority", i); + assertComplexData(cdata, "JMSType", "MyType"); + assertComplexData(cdata, "JMSCorrelationID", "MyCorrId"); + assertComplexData(cdata, "PropertiesText", "{MyStringHeader=StringHeader" + i + ", MyHeader=" + i + "}"); + + Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); + assertEquals("intProperties size()", 1, intProperties.size()); + assertEquals("intProperties.MyHeader", i, intProperties.get("MyHeader")); + + Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES); + assertEquals("stringProperties size()", 1, stringProperties.size()); + assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader")); + + assertComplexData(cdata, "JMSXGroupSeq", 1234); + assertComplexData(cdata, "JMSXGroupID", "MyGroupID"); + assertComplexData(cdata, "Text", "message:" + i); + + } + } + + protected void assertComplexData(CompositeData cdata, String name, Object expected) { + Object value = cdata.get(name); + assertEquals("CData field: " + name, expected, value); + } + + protected void assertQueueBrowseWorks() throws Exception { Integer mbeancnt = mbeanServer.getMBeanCount(); echo("Mbean count :" + mbeancnt); @@ -309,6 +384,10 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { for (int i = 0; i < MESSAGE_COUNT; i++) { Message message = session.createTextMessage("Message: " + i); message.setIntProperty("counter", i); + message.setJMSCorrelationID("MyCorrelationID"); + message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo")); + message.setJMSType("MyType"); + message.setJMSPriority(5); producer.send(message); } Thread.sleep(1000);