mirror of https://github.com/apache/activemq.git
added a fix for AMQ-1904 to expose the user properties as TabularData so JMX tools can browse all of the information in a message
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@687861 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3c32abd791
commit
3dd6575ac8
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.jmx;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 1.1 $
|
||||||
|
*/
|
||||||
|
public interface CompositeDataConstants {
|
||||||
|
String PROPERTIES = "PropertiesText";
|
||||||
|
String JMSXGROUP_SEQ = "JMSXGroupSeq";
|
||||||
|
String JMSXGROUP_ID = "JMSXGroupID";
|
||||||
|
String BODY_LENGTH = "BodyLength";
|
||||||
|
String BODY_PREVIEW = "BodyPreview";
|
||||||
|
String CONTENT_MAP = "ContentMap";
|
||||||
|
String MESSAGE_TEXT = "Text";
|
||||||
|
|
||||||
|
// User properties
|
||||||
|
String STRING_PROPERTIES = "StringProperties";
|
||||||
|
String BOOLEAN_PROPERTIES = "BooleanProperties";
|
||||||
|
String BYTE_PROPERTIES = "ByteProperties";
|
||||||
|
String SHORT_PROPERTIES = "ShortProperties";
|
||||||
|
String INT_PROPERTIES = "IntProperties";
|
||||||
|
String LONG_PROPERTIES = "LongProperties";
|
||||||
|
String FLOAT_PROPERTIES = "FloatProperties";
|
||||||
|
String DOUBLE_PROPERTIES = "DoubleProperties";
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.broker.jmx;
|
||||||
|
|
||||||
|
import javax.management.openmbean.CompositeData;
|
||||||
|
import javax.management.openmbean.TabularData;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision: 1.1 $
|
||||||
|
*/
|
||||||
|
public class CompositeDataHelper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts the named TabularData field from the CompositeData and converts it to a Map
|
||||||
|
* which is the method used to get the typesafe user properties.
|
||||||
|
*/
|
||||||
|
public static Map getTabularMap(CompositeData cdata, String fieldName) {
|
||||||
|
Map map = new HashMap();
|
||||||
|
Object tabularObject = cdata.get(fieldName);
|
||||||
|
if (tabularObject instanceof TabularData) {
|
||||||
|
TabularData tabularData = (TabularData) tabularObject;
|
||||||
|
Collection<CompositeData> values = tabularData.values();
|
||||||
|
for (CompositeData compositeData : values) {
|
||||||
|
Object key = compositeData.get("key");
|
||||||
|
Object value = compositeData.get("value");
|
||||||
|
map.put(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
}
|
|
@ -156,6 +156,10 @@ public class DestinationView implements DestinationViewMBean {
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
// TODO DELETE ME
|
||||||
|
System.out.println(e);
|
||||||
|
e.printStackTrace();
|
||||||
|
// TODO DELETE ME
|
||||||
LOG.warn("exception browsing destination", e);
|
LOG.warn("exception browsing destination", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
@ -32,6 +33,8 @@ import javax.management.openmbean.CompositeType;
|
||||||
import javax.management.openmbean.OpenDataException;
|
import javax.management.openmbean.OpenDataException;
|
||||||
import javax.management.openmbean.OpenType;
|
import javax.management.openmbean.OpenType;
|
||||||
import javax.management.openmbean.SimpleType;
|
import javax.management.openmbean.SimpleType;
|
||||||
|
import javax.management.openmbean.TabularType;
|
||||||
|
import javax.management.openmbean.TabularDataSupport;
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
import org.apache.activemq.command.ActiveMQBytesMessage;
|
||||||
import org.apache.activemq.command.ActiveMQMapMessage;
|
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||||
|
@ -40,6 +43,7 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
|
||||||
import org.apache.activemq.command.ActiveMQStreamMessage;
|
import org.apache.activemq.command.ActiveMQStreamMessage;
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
|
import static org.apache.activemq.broker.jmx.CompositeDataConstants.*;
|
||||||
|
|
||||||
public final class OpenTypeSupport {
|
public final class OpenTypeSupport {
|
||||||
|
|
||||||
|
@ -95,6 +99,14 @@ public final class OpenTypeSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
static class MessageOpenTypeFactory extends AbstractOpenTypeFactory {
|
static class MessageOpenTypeFactory extends AbstractOpenTypeFactory {
|
||||||
|
protected TabularType stringPropertyTabularType;
|
||||||
|
protected TabularType booleanPropertyTabularType;
|
||||||
|
protected TabularType bytePropertyTabularType;
|
||||||
|
protected TabularType shortPropertyTabularType;
|
||||||
|
protected TabularType intPropertyTabularType;
|
||||||
|
protected TabularType longPropertyTabularType;
|
||||||
|
protected TabularType floatPropertyTabularType;
|
||||||
|
protected TabularType doublePropertyTabularType;
|
||||||
|
|
||||||
protected String getTypeName() {
|
protected String getTypeName() {
|
||||||
return ActiveMQMessage.class.getName();
|
return ActiveMQMessage.class.getName();
|
||||||
|
@ -112,7 +124,28 @@ public final class OpenTypeSupport {
|
||||||
addItem("JMSPriority", "JMSPriority", SimpleType.INTEGER);
|
addItem("JMSPriority", "JMSPriority", SimpleType.INTEGER);
|
||||||
addItem("JMSRedelivered", "JMSRedelivered", SimpleType.BOOLEAN);
|
addItem("JMSRedelivered", "JMSRedelivered", SimpleType.BOOLEAN);
|
||||||
addItem("JMSTimestamp", "JMSTimestamp", SimpleType.DATE);
|
addItem("JMSTimestamp", "JMSTimestamp", SimpleType.DATE);
|
||||||
addItem("Properties", "Properties", SimpleType.STRING);
|
addItem(JMSXGROUP_ID, "Message Group ID", SimpleType.STRING);
|
||||||
|
addItem(JMSXGROUP_SEQ, "Message Group Sequence Number", SimpleType.INTEGER);
|
||||||
|
addItem(CompositeDataConstants.PROPERTIES, "User Properties Text", SimpleType.STRING);
|
||||||
|
|
||||||
|
// now lets expose the type safe properties
|
||||||
|
stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING);
|
||||||
|
booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN);
|
||||||
|
bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE);
|
||||||
|
shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT);
|
||||||
|
intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER);
|
||||||
|
longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG);
|
||||||
|
floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT);
|
||||||
|
doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE);
|
||||||
|
|
||||||
|
addItem(CompositeDataConstants.STRING_PROPERTIES, "User String Properties", stringPropertyTabularType);
|
||||||
|
addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, "User Boolean Properties", booleanPropertyTabularType);
|
||||||
|
addItem(CompositeDataConstants.BYTE_PROPERTIES, "User Byte Properties", bytePropertyTabularType);
|
||||||
|
addItem(CompositeDataConstants.SHORT_PROPERTIES, "User Short Properties", shortPropertyTabularType);
|
||||||
|
addItem(CompositeDataConstants.INT_PROPERTIES, "User Integer Properties", intPropertyTabularType);
|
||||||
|
addItem(CompositeDataConstants.LONG_PROPERTIES, "User Long Properties", longPropertyTabularType);
|
||||||
|
addItem(CompositeDataConstants.FLOAT_PROPERTIES, "User Float Properties", floatPropertyTabularType);
|
||||||
|
addItem(CompositeDataConstants.DOUBLE_PROPERTIES, "User Double Properties", doublePropertyTabularType);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Object> getFields(Object o) throws OpenDataException {
|
public Map<String, Object> getFields(Object o) throws OpenDataException {
|
||||||
|
@ -128,25 +161,98 @@ public final class OpenTypeSupport {
|
||||||
rc.put("JMSPriority", Integer.valueOf(m.getJMSPriority()));
|
rc.put("JMSPriority", Integer.valueOf(m.getJMSPriority()));
|
||||||
rc.put("JMSRedelivered", Boolean.valueOf(m.getJMSRedelivered()));
|
rc.put("JMSRedelivered", Boolean.valueOf(m.getJMSRedelivered()));
|
||||||
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
|
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
|
||||||
|
rc.put(JMSXGROUP_ID, m.getGroupID());
|
||||||
|
rc.put(JMSXGROUP_SEQ, m.getGroupSequence());
|
||||||
try {
|
try {
|
||||||
rc.put("Properties", "" + m.getProperties());
|
rc.put(CompositeDataConstants.PROPERTIES, "" + m.getProperties());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
rc.put("Properties", "");
|
rc.put(CompositeDataConstants.PROPERTIES, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
rc.put(CompositeDataConstants.STRING_PROPERTIES, createTabularData(m, stringPropertyTabularType, String.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
rc.put(CompositeDataConstants.STRING_PROPERTIES, new TabularDataSupport(stringPropertyTabularType));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, createTabularData(m, booleanPropertyTabularType, Boolean.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, new TabularDataSupport(booleanPropertyTabularType));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
rc.put(CompositeDataConstants.BYTE_PROPERTIES, createTabularData(m, bytePropertyTabularType, Byte.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
rc.put(CompositeDataConstants.BYTE_PROPERTIES, new TabularDataSupport(bytePropertyTabularType));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
rc.put(CompositeDataConstants.SHORT_PROPERTIES, createTabularData(m, shortPropertyTabularType, Short.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
rc.put(CompositeDataConstants.SHORT_PROPERTIES, new TabularDataSupport(shortPropertyTabularType));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
rc.put(CompositeDataConstants.INT_PROPERTIES, createTabularData(m, intPropertyTabularType, Integer.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
rc.put(CompositeDataConstants.INT_PROPERTIES, new TabularDataSupport(intPropertyTabularType));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
rc.put(CompositeDataConstants.LONG_PROPERTIES, createTabularData(m, longPropertyTabularType, Long.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
rc.put(CompositeDataConstants.LONG_PROPERTIES, new TabularDataSupport(longPropertyTabularType));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
rc.put(CompositeDataConstants.FLOAT_PROPERTIES, createTabularData(m, floatPropertyTabularType, Float.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
rc.put(CompositeDataConstants.FLOAT_PROPERTIES, new TabularDataSupport(floatPropertyTabularType));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, createTabularData(m, doublePropertyTabularType, Double.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, new TabularDataSupport(doublePropertyTabularType));
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException {
|
||||||
|
String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">";
|
||||||
|
String[] keyValue = new String[]{"key", "value"};
|
||||||
|
OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
|
||||||
|
CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes);
|
||||||
|
return new TabularType(typeName, typeName, rowType, new String[]{"key"});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TabularDataSupport createTabularData(ActiveMQMessage m, TabularType type, Class valueType) throws IOException, OpenDataException {
|
||||||
|
TabularDataSupport answer = new TabularDataSupport(type);
|
||||||
|
Set<Map.Entry<String,Object>> entries = m.getProperties().entrySet();
|
||||||
|
for (Map.Entry<String, Object> entry : entries) {
|
||||||
|
Object value = entry.getValue();
|
||||||
|
if (valueType.isInstance(value)) {
|
||||||
|
CompositeDataSupport compositeData = createTabularRowValue(type, entry.getKey(), value);
|
||||||
|
answer.put(compositeData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return answer;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected CompositeDataSupport createTabularRowValue(TabularType type, String key, Object value) throws OpenDataException {
|
||||||
|
Map<String,Object> fields = new HashMap<String, Object>();
|
||||||
|
fields.put("key", key);
|
||||||
|
fields.put("value", value);
|
||||||
|
return new CompositeDataSupport(type.getRowType(), fields);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||||
|
|
||||||
|
|
||||||
protected String getTypeName() {
|
protected String getTypeName() {
|
||||||
return ActiveMQBytesMessage.class.getName();
|
return ActiveMQBytesMessage.class.getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void init() throws OpenDataException {
|
protected void init() throws OpenDataException {
|
||||||
super.init();
|
super.init();
|
||||||
addItem("BodyLength", "Body length", SimpleType.LONG);
|
addItem(BODY_LENGTH, "Body length", SimpleType.LONG);
|
||||||
addItem("BodyPreview", "Body preview", new ArrayType(1, SimpleType.BYTE));
|
addItem(BODY_PREVIEW, "Body preview", new ArrayType(1, SimpleType.BYTE));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Object> getFields(Object o) throws OpenDataException {
|
public Map<String, Object> getFields(Object o) throws OpenDataException {
|
||||||
|
@ -155,9 +261,9 @@ public final class OpenTypeSupport {
|
||||||
long length = 0;
|
long length = 0;
|
||||||
try {
|
try {
|
||||||
length = m.getBodyLength();
|
length = m.getBodyLength();
|
||||||
rc.put("BodyLength", Long.valueOf(length));
|
rc.put(BODY_LENGTH, Long.valueOf(length));
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
rc.put("BodyLength", Long.valueOf(0));
|
rc.put(BODY_LENGTH, Long.valueOf(0));
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
byte preview[] = new byte[(int)Math.min(length, 255)];
|
byte preview[] = new byte[(int)Math.min(length, 255)];
|
||||||
|
@ -171,9 +277,9 @@ public final class OpenTypeSupport {
|
||||||
data[i] = new Byte(preview[i]);
|
data[i] = new Byte(preview[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
rc.put("BodyPreview", data);
|
rc.put(BODY_PREVIEW, data);
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
rc.put("BodyPreview", new byte[] {});
|
rc.put(BODY_PREVIEW, new byte[] {});
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
@ -181,22 +287,23 @@ public final class OpenTypeSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||||
|
|
||||||
protected String getTypeName() {
|
protected String getTypeName() {
|
||||||
return ActiveMQMapMessage.class.getName();
|
return ActiveMQMapMessage.class.getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void init() throws OpenDataException {
|
protected void init() throws OpenDataException {
|
||||||
super.init();
|
super.init();
|
||||||
addItem("ContentMap", "Content map", SimpleType.STRING);
|
addItem(CONTENT_MAP, "Content map", SimpleType.STRING);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Object> getFields(Object o) throws OpenDataException {
|
public Map<String, Object> getFields(Object o) throws OpenDataException {
|
||||||
ActiveMQMapMessage m = (ActiveMQMapMessage)o;
|
ActiveMQMapMessage m = (ActiveMQMapMessage)o;
|
||||||
Map<String, Object> rc = super.getFields(o);
|
Map<String, Object> rc = super.getFields(o);
|
||||||
try {
|
try {
|
||||||
rc.put("ContentMap", "" + m.getContentMap());
|
rc.put(CONTENT_MAP, "" + m.getContentMap());
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
rc.put("ContentMap", "");
|
rc.put(CONTENT_MAP, "");
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
@ -233,22 +340,23 @@ public final class OpenTypeSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||||
|
|
||||||
protected String getTypeName() {
|
protected String getTypeName() {
|
||||||
return ActiveMQTextMessage.class.getName();
|
return ActiveMQTextMessage.class.getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void init() throws OpenDataException {
|
protected void init() throws OpenDataException {
|
||||||
super.init();
|
super.init();
|
||||||
addItem("Text", "Text", SimpleType.STRING);
|
addItem(MESSAGE_TEXT, MESSAGE_TEXT, SimpleType.STRING);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Object> getFields(Object o) throws OpenDataException {
|
public Map<String, Object> getFields(Object o) throws OpenDataException {
|
||||||
ActiveMQTextMessage m = (ActiveMQTextMessage)o;
|
ActiveMQTextMessage m = (ActiveMQTextMessage)o;
|
||||||
Map<String, Object> rc = super.getFields(o);
|
Map<String, Object> rc = super.getFields(o);
|
||||||
try {
|
try {
|
||||||
rc.put("Text", "" + m.getText());
|
rc.put(MESSAGE_TEXT, "" + m.getText());
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
rc.put("Text", "");
|
rc.put(MESSAGE_TEXT, "");
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.activemq.broker.jmx;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.HashMap;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
|
@ -30,6 +32,7 @@ import javax.management.openmbean.CompositeData;
|
||||||
import javax.management.openmbean.TabularData;
|
import javax.management.openmbean.TabularData;
|
||||||
import junit.textui.TestRunner;
|
import junit.textui.TestRunner;
|
||||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.BaseDestination;
|
import org.apache.activemq.broker.region.BaseDestination;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -72,6 +75,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
||||||
|
|
||||||
// test all the various MBeans now we have a producer, consumer and
|
// test all the various MBeans now we have a producer, consumer and
|
||||||
// messages on a queue
|
// messages on a queue
|
||||||
|
assertSendViaMBean();
|
||||||
assertQueueBrowseWorks();
|
assertQueueBrowseWorks();
|
||||||
assertCreateAndDestroyDurableSubscriptions();
|
assertCreateAndDestroyDurableSubscriptions();
|
||||||
assertConsumerCounts();
|
assertConsumerCounts();
|
||||||
|
@ -126,6 +130,77 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
||||||
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
|
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected void assertSendViaMBean() throws Exception {
|
||||||
|
String queueName = getDestinationString() + ".SendMBBean";
|
||||||
|
|
||||||
|
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
|
||||||
|
echo("Create QueueView MBean...");
|
||||||
|
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
|
||||||
|
broker.addQueue(queueName);
|
||||||
|
|
||||||
|
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + queueName + ",BrokerName=localhost");
|
||||||
|
|
||||||
|
echo("Create QueueView MBean...");
|
||||||
|
QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||||
|
|
||||||
|
int count = 5;
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
String body = "message:" + i;
|
||||||
|
|
||||||
|
Map headers = new HashMap();
|
||||||
|
headers.put("JMSCorrelationID", "MyCorrId");
|
||||||
|
headers.put("JMSDeliveryMode", Boolean.TRUE);
|
||||||
|
headers.put("JMSXGroupID", "MyGroupID");
|
||||||
|
headers.put("JMSXGroupSeq", 1234);
|
||||||
|
headers.put("JMSPriority", i);
|
||||||
|
headers.put("JMSType", "MyType");
|
||||||
|
headers.put("MyHeader", i);
|
||||||
|
headers.put("MyStringHeader", "StringHeader" + i);
|
||||||
|
|
||||||
|
proxy.sendTextMessage(headers, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
CompositeData[] compdatalist = proxy.browse();
|
||||||
|
if (compdatalist.length == 0) {
|
||||||
|
fail("There is no message in the queue:");
|
||||||
|
}
|
||||||
|
String[] messageIDs = new String[compdatalist.length];
|
||||||
|
|
||||||
|
for (int i = 0; i < compdatalist.length; i++) {
|
||||||
|
CompositeData cdata = compdatalist[i];
|
||||||
|
|
||||||
|
if (i == 0) {
|
||||||
|
echo("Columns: " + cdata.getCompositeType().keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertComplexData(cdata, "JMSCorrelationID", "MyCorrId");
|
||||||
|
assertComplexData(cdata, "JMSPriority", i);
|
||||||
|
assertComplexData(cdata, "JMSType", "MyType");
|
||||||
|
assertComplexData(cdata, "JMSCorrelationID", "MyCorrId");
|
||||||
|
assertComplexData(cdata, "PropertiesText", "{MyStringHeader=StringHeader" + i + ", MyHeader=" + i + "}");
|
||||||
|
|
||||||
|
Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
|
||||||
|
assertEquals("intProperties size()", 1, intProperties.size());
|
||||||
|
assertEquals("intProperties.MyHeader", i, intProperties.get("MyHeader"));
|
||||||
|
|
||||||
|
Map stringProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.STRING_PROPERTIES);
|
||||||
|
assertEquals("stringProperties size()", 1, stringProperties.size());
|
||||||
|
assertEquals("stringProperties.MyHeader", "StringHeader" + i, stringProperties.get("MyStringHeader"));
|
||||||
|
|
||||||
|
assertComplexData(cdata, "JMSXGroupSeq", 1234);
|
||||||
|
assertComplexData(cdata, "JMSXGroupID", "MyGroupID");
|
||||||
|
assertComplexData(cdata, "Text", "message:" + i);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void assertComplexData(CompositeData cdata, String name, Object expected) {
|
||||||
|
Object value = cdata.get(name);
|
||||||
|
assertEquals("CData field: " + name, expected, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected void assertQueueBrowseWorks() throws Exception {
|
protected void assertQueueBrowseWorks() throws Exception {
|
||||||
Integer mbeancnt = mbeanServer.getMBeanCount();
|
Integer mbeancnt = mbeanServer.getMBeanCount();
|
||||||
echo("Mbean count :" + mbeancnt);
|
echo("Mbean count :" + mbeancnt);
|
||||||
|
@ -309,6 +384,10 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
||||||
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||||
Message message = session.createTextMessage("Message: " + i);
|
Message message = session.createTextMessage("Message: " + i);
|
||||||
message.setIntProperty("counter", i);
|
message.setIntProperty("counter", i);
|
||||||
|
message.setJMSCorrelationID("MyCorrelationID");
|
||||||
|
message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
|
||||||
|
message.setJMSType("MyType");
|
||||||
|
message.setJMSPriority(5);
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
}
|
}
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
|
Loading…
Reference in New Issue