From ea3c3e0aef9336a233e31d1872743f1ba1f61bca Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Thu, 7 Jan 2016 14:42:03 +0000 Subject: [PATCH] ARTEMIS-334 - Add Management browse functionality similar to ActiveMQ https://issues.apache.org/jira/browse/ARTEMIS-334 --- .../activemq/artemis/api/core/Message.java | 6 + .../api/core/management/ManagementHelper.java | 31 +- .../api/core/management/QueueControl.java | 7 + .../core/message/impl/MessageImpl.java | 7 + .../api/jms/management/JMSQueueControl.java | 14 + .../artemis/jms/client/ActiveMQMessage.java | 9 + .../management/impl/JMSQueueControlImpl.java | 34 ++ .../openmbean/JMSCompositeDataConstants.java | 58 +++ .../impl/openmbean/JMSOpenTypeSupport.java | 353 ++++++++++++++++++ .../impl/FileConfigurationParser.java | 5 + .../management/impl/QueueControlImpl.java | 38 ++ .../openmbean/CompositeDataConstants.java | 66 ++++ .../impl/openmbean/OpenTypeSupport.java | 256 +++++++++++++ .../core/settings/impl/AddressSettings.java | 35 +- .../schema/artemis-configuration.xsd | 10 + .../impl/ScheduledDeliveryHandlerTest.java | 5 + .../integration/client/AcknowledgeTest.java | 5 + .../management/JMSQueueControlTest.java | 210 +++++++++++ .../JMSQueueControlUsingJMSTest.java | 16 + .../jms/server/management/JMSUtil.java | 43 +++ .../management/QueueControlUsingCoreTest.java | 7 + 21 files changed, 1213 insertions(+), 2 deletions(-) create mode 100644 artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java create mode 100644 artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 77bb7cb53d..f5cb55bd6c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -579,4 +579,10 @@ public interface Message { * @return Returns the message in Map form, useful when encoding to JSON */ Map toMap(); + + + /** + * @return Returns the message properties in Map form, useful when encoding to JSON + */ + Map toPropertyMap(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java index fb7d88bfcf..98fafc4da3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.api.core.management; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -24,9 +26,13 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; +import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.json.JSONArray; import org.apache.activemq.artemis.utils.json.JSONObject; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; + /** * Helper class to use ActiveMQ Artemis Core messages to manage server resources. */ @@ -178,7 +184,20 @@ public final class ManagementHelper { if (clz.isArray()) { Object[] innerArray = (Object[]) parameter; - jsonArray.put(ManagementHelper.toJSONArray(innerArray)); + if (innerArray instanceof CompositeData[]) { + JSONArray jsonArray1 = new JSONArray(); + for (Object data : innerArray) { + String s = Base64.encodeObject((CompositeDataSupport)data); + jsonArray1.put(s); + } + JSONObject jsonObject = new JSONObject(); + jsonObject.put(CompositeData.class.getName(), jsonArray1); + jsonArray.put(jsonObject); + System.out.println("ManagementHelper.toJSONArray"); + } + else { + jsonArray.put(ManagementHelper.toJSONArray(innerArray)); + } } else { ManagementHelper.checkType(parameter); @@ -235,6 +254,16 @@ public final class ManagementHelper { innerVal = ((Integer) innerVal).longValue(); } + if (CompositeData.class.getName().equals(key)) { + Object[] data = (Object[]) innerVal; + CompositeData[] cds = new CompositeData[data.length]; + for (int i1 = 0; i1 < data.length; i1++) { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(Base64.decode((data[i1].toString())))); + cds[i1] = (CompositeDataSupport) ois.readObject(); + } + innerVal = cds; + } + map.put(key, innerVal); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 7d325ece03..298f1e724c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.api.core.management; import javax.management.MBeanOperationInfo; +import javax.management.openmbean.CompositeData; import java.util.Map; /** @@ -377,6 +378,12 @@ public interface QueueControl { @Operation(desc = "Inspects if the queue is paused", impact = MBeanOperationInfo.INFO) boolean isPaused() throws Exception; + /** + * Resets the MessagesAdded property + */ + @Operation(desc = "Browse Messages", impact = MBeanOperationInfo.ACTION) + CompositeData[] browse(String filter) throws Exception; + /** * Resets the MessagesAdded property */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java index f27402eae7..783cf00d00 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java @@ -420,6 +420,13 @@ public abstract class MessageImpl implements MessageInternal { map.put("expiration", expiration); map.put("timestamp", timestamp); map.put("priority", priority); + map.putAll(toPropertyMap()); + return map; + } + + @Override + public Map toPropertyMap() { + Map map = new HashMap<>(); for (SimpleString propName : properties.getPropertyNames()) { map.put(propName.toString(), properties.getProperty(propName)); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java index 7dbc9574ac..d1251fa843 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.jms.management; import java.util.Map; import javax.management.MBeanOperationInfo; +import javax.management.openmbean.CompositeData; import org.apache.activemq.artemis.api.core.management.Operation; import org.apache.activemq.artemis.api.core.management.Parameter; @@ -297,6 +298,19 @@ public interface JMSQueueControl extends DestinationControl { @Operation(desc = "Resume the queue.", impact = MBeanOperationInfo.ACTION) void resume() throws Exception; + + /** + * Resumes the queue. Messages are again delivered to its consumers. + */ + @Operation(desc = "Browse the queue.", impact = MBeanOperationInfo.ACTION) + CompositeData[] browse() throws Exception; + + /** + * Resumes the queue. Messages are again delivered to its consumers. + */ + @Operation(desc = "Browse the queue.", impact = MBeanOperationInfo.ACTION) + CompositeData[] browse(String filter) throws Exception; + @Operation(desc = "List all the existent consumers on the Queue") String listConsumersAsJSON() throws Exception; diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index 528a8a3e49..eee6327fac 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -25,6 +25,8 @@ import javax.jms.JMSRuntimeException; import javax.jms.Message; import javax.jms.MessageFormatException; import javax.jms.MessageNotWriteableException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.OutputStream; @@ -93,6 +95,13 @@ public class ActiveMQMessage implements javax.jms.Message { return jmsMessage; } + + + public static CompositeData coreCompositeTypeToJMSCompositeType(CompositeDataSupport data) throws Exception { + CompositeData jmsdata = new CompositeDataSupport(data.getCompositeType(), new HashMap()); + return jmsdata; + } + // Static -------------------------------------------------------- private static final HashSet reservedIdentifiers = new HashSet<>(); diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java index d279f8e016..130f418ec3 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java @@ -16,12 +16,17 @@ */ package org.apache.activemq.artemis.jms.management.impl; +import javax.jms.InvalidSelectorException; import javax.management.MBeanInfo; import javax.management.StandardMBean; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException; import org.apache.activemq.artemis.api.core.FilterConstants; import org.apache.activemq.artemis.api.core.management.MessageCounterInfo; import org.apache.activemq.artemis.api.core.management.Operation; @@ -33,6 +38,7 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.jms.client.SelectorTranslator; +import org.apache.activemq.artemis.jms.management.impl.openmbean.JMSOpenTypeSupport; import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.utils.json.JSONArray; import org.apache.activemq.artemis.utils.json.JSONObject; @@ -205,6 +211,10 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro return jmsMessages; } + private CompositeData toJMSCompositeType(CompositeDataSupport data) throws Exception { + return JMSOpenTypeSupport.convert(data); + } + @Override public Map[] listScheduledMessages() throws Exception { Map[] coreMessages = coreQueueControl.listScheduledMessages(); @@ -405,6 +415,30 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro coreQueueControl.resume(); } + @Override + public CompositeData[] browse() throws Exception { + return browse(null); + } + + @Override + public CompositeData[] browse(String filter) throws Exception { + try { + CompositeData[] messages = coreQueueControl.browse(filter); + + ArrayList c = new ArrayList<>(); + + for (CompositeData message : messages) { + c.add(toJMSCompositeType((CompositeDataSupport) message)); + } + CompositeData[] rc = new CompositeData[c.size()]; + c.toArray(rc); + return rc; + } + catch (ActiveMQInvalidFilterExpressionException e) { + throw new InvalidSelectorException(e.getMessage()); + } + } + @Override public String getSelector() { return coreQueueControl.getFilter(); diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java new file mode 100644 index 0000000000..e75b2f5940 --- /dev/null +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSCompositeDataConstants.java @@ -0,0 +1,58 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.management.impl.openmbean; + +public interface JMSCompositeDataConstants { + String JMS_DESTINATION = "JMSDestination"; + String JMS_MESSAGE_ID = "JMSMessageID"; + String JMS_TYPE = "JMSType"; + String JMS_DELIVERY_MODE = "JMSDeliveryMode"; + String JMS_EXPIRATION = "JMSExpiration"; + String JMS_PRIORITY = "JMSPriority"; + String JMS_REDELIVERED = "JMSRedelivered"; + String JMS_TIMESTAMP = "JMSTimestamp"; + String JMSXGROUP_SEQ = "JMSXGroupSeq"; + String JMSXGROUP_ID = "JMSXGroupID"; + String JMSXUSER_ID = "JMSXUserID"; + String JMS_CORRELATION_ID = "JMSCorrelationID"; + String ORIGINAL_DESTINATION = "OriginalDestination"; + String JMS_REPLY_TO = "JMSReplyTo"; + + String JMS_DESTINATION_DESCRIPTION = "The message destination"; + String JMS_MESSAGE_ID_DESCRIPTION = "The message ID"; + String JMS_TYPE_DESCRIPTION = "The message type"; + String JMS_DELIVERY_MODE_DESCRIPTION = "The message delivery mode"; + String JMS_EXPIRATION_DESCRIPTION = "The message expiration"; + String JMS_PRIORITY_DESCRIPTION = "The message priority"; + String JMS_REDELIVERED_DESCRIPTION = "Is the message redelivered"; + String JMS_TIMESTAMP_DESCRIPTION = "The message timestamp"; + String JMSXGROUP_SEQ_DESCRIPTION = "The message group sequence number"; + String JMSXGROUP_ID_DESCRIPTION = "The message group ID"; + String JMSXUSER_ID_DESCRIPTION = "The user that sent the message"; + String JMS_CORRELATION_ID_DESCRIPTION = "The message correlation ID"; + String ORIGINAL_DESTINATION_DESCRIPTION = "Original Destination Before Senting To DLQ"; + String JMS_REPLY_TO_DESCRIPTION = "The reply to address"; + + String BODY_LENGTH = "BodyLength"; + String BODY_PREVIEW = "BodyPreview"; + String CONTENT_MAP = "ContentMap"; + String MESSAGE_TEXT = "Text"; + String MESSAGE_URL = "Url"; + + +} diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java new file mode 100644 index 0000000000..14d8c841b8 --- /dev/null +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/openmbean/JMSOpenTypeSupport.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.management.impl.openmbean; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants; +import org.apache.activemq.artemis.reader.MapMessageUtil; +import org.apache.activemq.artemis.utils.TypedProperties; + +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public final class JMSOpenTypeSupport { + + public interface OpenTypeFactory { + CompositeType getCompositeType() throws OpenDataException; + + Map getFields(CompositeDataSupport data) throws OpenDataException; + } + + private static final Map OPEN_TYPE_FACTORIES = new HashMap<>(); + + public abstract static class AbstractOpenTypeFactory implements OpenTypeFactory { + + private CompositeType compositeType; + private final List itemNamesList = new ArrayList(); + private final List itemDescriptionsList = new ArrayList(); + private final List itemTypesList = new ArrayList(); + + public CompositeType getCompositeType() throws OpenDataException { + if (compositeType == null) { + init(); + compositeType = createCompositeType(); + } + return compositeType; + } + + protected void init() throws OpenDataException { + } + + protected CompositeType createCompositeType() throws OpenDataException { + String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]); + String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]); + OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]); + return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes); + } + + protected abstract String getTypeName(); + + protected void addItem(String name, String description, OpenType type) { + itemNamesList.add(name); + itemDescriptionsList.add(description); + itemTypesList.add(type); + } + + protected String getDescription() { + return getTypeName(); + } + + public Map getFields(CompositeDataSupport data) throws OpenDataException { + Map rc = new HashMap(); + return rc; + } + } + + static class MessageOpenTypeFactory extends AbstractOpenTypeFactory { + protected TabularType stringPropertyTabularType; + protected TabularType booleanPropertyTabularType; + protected TabularType bytePropertyTabularType; + protected TabularType shortPropertyTabularType; + protected TabularType intPropertyTabularType; + protected TabularType longPropertyTabularType; + protected TabularType floatPropertyTabularType; + protected TabularType doublePropertyTabularType; + + protected ArrayType body; + + @Override + protected String getTypeName() { + return Message.class.getName(); + } + + @Override + protected void init() throws OpenDataException { + super.init(); + + addItem(JMSCompositeDataConstants.JMS_DESTINATION, JMSCompositeDataConstants.JMS_DESTINATION_DESCRIPTION, SimpleType.STRING); + addItem(JMSCompositeDataConstants.JMS_MESSAGE_ID, JMSCompositeDataConstants.JMS_MESSAGE_ID_DESCRIPTION, SimpleType.STRING); + addItem(JMSCompositeDataConstants.JMS_CORRELATION_ID, JMSCompositeDataConstants.JMS_CORRELATION_ID_DESCRIPTION, SimpleType.STRING); + addItem(JMSCompositeDataConstants.JMS_TYPE, JMSCompositeDataConstants.JMS_TYPE_DESCRIPTION, SimpleType.STRING); + addItem(JMSCompositeDataConstants.JMS_DELIVERY_MODE, JMSCompositeDataConstants.JMS_DELIVERY_MODE_DESCRIPTION, SimpleType.STRING); + addItem(JMSCompositeDataConstants.JMS_EXPIRATION, JMSCompositeDataConstants.JMS_EXPIRATION_DESCRIPTION, SimpleType.LONG); + addItem(JMSCompositeDataConstants.JMS_PRIORITY, JMSCompositeDataConstants.JMS_PRIORITY_DESCRIPTION, SimpleType.INTEGER); + addItem(JMSCompositeDataConstants.JMS_REDELIVERED, JMSCompositeDataConstants.JMS_REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN); + addItem(JMSCompositeDataConstants.JMS_TIMESTAMP, JMSCompositeDataConstants.JMS_TIMESTAMP_DESCRIPTION, SimpleType.DATE); + addItem(JMSCompositeDataConstants.JMSXGROUP_ID, JMSCompositeDataConstants.JMSXGROUP_ID_DESCRIPTION, SimpleType.STRING); + addItem(JMSCompositeDataConstants.JMSXGROUP_SEQ, JMSCompositeDataConstants.JMSXGROUP_SEQ_DESCRIPTION, SimpleType.INTEGER); + addItem(JMSCompositeDataConstants.JMSXUSER_ID, JMSCompositeDataConstants.JMSXUSER_ID_DESCRIPTION, SimpleType.STRING); + addItem(JMSCompositeDataConstants.JMS_REPLY_TO, JMSCompositeDataConstants.JMS_REPLY_TO_DESCRIPTION, SimpleType.STRING); + addItem(JMSCompositeDataConstants.ORIGINAL_DESTINATION, JMSCompositeDataConstants.ORIGINAL_DESTINATION_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING); + + // now lets expose the type safe properties + stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING); + booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN); + bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE); + shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT); + intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER); + longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG); + floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT); + doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE); + + addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType); + addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType); + addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType); + addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType); + addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType); + addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType); + addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType); + addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType); + } + + @Override + public Map getFields(CompositeDataSupport data) throws OpenDataException { + Map rc = super.getFields(data); + putString(rc, data, JMSCompositeDataConstants.JMS_MESSAGE_ID, CompositeDataConstants.USER_ID); + putString(rc, data, JMSCompositeDataConstants.JMS_DESTINATION, CompositeDataConstants.ADDRESS); + putStringProperty(rc, data, JMSCompositeDataConstants.JMS_REPLY_TO, "JMSReplyTo"); + rc.put(JMSCompositeDataConstants.JMS_TYPE, getType()); + rc.put(JMSCompositeDataConstants.JMS_DELIVERY_MODE, ((Boolean)data.get(CompositeDataConstants.DURABLE)) ? "PERSISTENT" : "NON-PERSISTENT"); + rc.put(JMSCompositeDataConstants.JMS_EXPIRATION, data.get(CompositeDataConstants.EXPIRATION)); + rc.put(JMSCompositeDataConstants.JMS_TIMESTAMP, new Date((Long) data.get(CompositeDataConstants.TIMESTAMP))); + rc.put(JMSCompositeDataConstants.JMS_PRIORITY, ((Byte) data.get(CompositeDataConstants.PRIORITY)).intValue()); + putStringProperty(rc, data, JMSCompositeDataConstants.JMS_CORRELATION_ID, JMSCompositeDataConstants.JMS_CORRELATION_ID); + rc.put(JMSCompositeDataConstants.JMS_REDELIVERED, data.get(CompositeDataConstants.REDELIVERED)); + putStringProperty(rc, data, JMSCompositeDataConstants.JMSXGROUP_ID, Message.HDR_GROUP_ID.toString()); + putIntProperty(rc, data, JMSCompositeDataConstants.JMSXGROUP_SEQ, JMSCompositeDataConstants.JMSXGROUP_SEQ); + putStringProperty(rc, data, JMSCompositeDataConstants.JMSXUSER_ID, JMSCompositeDataConstants.JMSXUSER_ID); + putStringProperty(rc, data, JMSCompositeDataConstants.ORIGINAL_DESTINATION, Message.HDR_ORIGINAL_ADDRESS.toString()); + + rc.put(CompositeDataConstants.PROPERTIES, "" + data.get(CompositeDataConstants.PROPERTIES)); + + rc.put(CompositeDataConstants.STRING_PROPERTIES, data.get(CompositeDataConstants.STRING_PROPERTIES)); + rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, data.get(CompositeDataConstants.BOOLEAN_PROPERTIES)); + rc.put(CompositeDataConstants.BYTE_PROPERTIES, data.get(CompositeDataConstants.BYTE_PROPERTIES)); + rc.put(CompositeDataConstants.SHORT_PROPERTIES, data.get(CompositeDataConstants.SHORT_PROPERTIES)); + rc.put(CompositeDataConstants.INT_PROPERTIES, data.get(CompositeDataConstants.INT_PROPERTIES)); + rc.put(CompositeDataConstants.LONG_PROPERTIES, data.get(CompositeDataConstants.LONG_PROPERTIES)); + rc.put(CompositeDataConstants.FLOAT_PROPERTIES, data.get(CompositeDataConstants.FLOAT_PROPERTIES)); + rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, data.get(CompositeDataConstants.DOUBLE_PROPERTIES)); + + return rc; + } + + private void putString(Map rc, CompositeDataSupport data, String target, String source) { + String prop = (String) data.get(source); + if (prop != null) { + rc.put(target, prop); + } + else { + rc.put(target, ""); + } + } + + private void putStringProperty(Map rc, CompositeDataSupport data, String target, String source) { + TabularDataSupport properties = (TabularDataSupport) data.get(CompositeDataConstants.STRING_PROPERTIES); + Object[] keys = new Object[]{source}; + CompositeDataSupport cds = (CompositeDataSupport) properties.get(keys); + String prop = ""; + if (cds != null && cds.get("value") != null) { + prop = (String) cds.get("value"); + } + rc.put(target, prop); + } + + private void putIntProperty(Map rc, CompositeDataSupport data, String target, String source) { + TabularDataSupport properties = (TabularDataSupport) data.get(CompositeDataConstants.INT_PROPERTIES); + Object[] keys = new Object[]{source}; + CompositeDataSupport cds = (CompositeDataSupport) properties.get(keys); + Integer prop = 0; + if (cds != null && cds.get("value") != null) { + prop = (Integer) cds.get("value"); + } + rc.put(target, prop); + } + + private String getType() { + return "Message"; + } + + protected String toString(Object value) { + if (value == null) { + return null; + } + return value.toString(); + } + + + protected TabularType createTabularType(Class type, OpenType openType) throws OpenDataException { + String typeName = "java.util.Map"; + String[] keyValue = new String[]{"key", "value"}; + OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType}; + CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes); + return new TabularType(typeName, typeName, rowType, new String[]{"key"}); + } + } + + static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory { + + + @Override + protected String getTypeName() { + return "BytesMessage"; + } + + @Override + protected void init() throws OpenDataException { + super.init(); + addItem(JMSCompositeDataConstants.BODY_LENGTH, "Body length", SimpleType.LONG); + addItem(JMSCompositeDataConstants.BODY_PREVIEW, "Body preview", new ArrayType(SimpleType.BYTE, true)); + } + + @Override + public Map getFields(CompositeDataSupport data) throws OpenDataException { + Map rc = super.getFields(data); + ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body")); + long length = 0; + length = buffer.readableBytes(); + rc.put(JMSCompositeDataConstants.BODY_LENGTH, Long.valueOf(length)); + byte[] preview = new byte[(int) Math.min(length, 255)]; + buffer.readBytes(preview); + rc.put(JMSCompositeDataConstants.BODY_PREVIEW, preview); + return rc; + } + } + + static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory { + + @Override + protected String getTypeName() { + return "MapMessage"; + } + + @Override + protected void init() throws OpenDataException { + super.init(); + addItem(JMSCompositeDataConstants.CONTENT_MAP, "Content map", SimpleType.STRING); + } + + @Override + public Map getFields(CompositeDataSupport data) throws OpenDataException { + Map rc = super.getFields(data); + ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body")); + TypedProperties properties = new TypedProperties(); + MapMessageUtil.readBodyMap(buffer, properties); + rc.put(JMSCompositeDataConstants.CONTENT_MAP, "" + properties.getMap()); + return rc; + } + } + + static class ObjectMessageOpenTypeFactory extends MessageOpenTypeFactory { + @Override + protected String getTypeName() { + return "ObjectMessage"; + } + } + static class StreamMessageOpenTypeFactory extends MessageOpenTypeFactory { + @Override + protected String getTypeName() { + return "StreamMessage"; + } + } + + static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory { + + @Override + protected String getTypeName() { + return "TextMessage"; + } + + @Override + protected void init() throws OpenDataException { + super.init(); + addItem(JMSCompositeDataConstants.MESSAGE_TEXT, JMSCompositeDataConstants.MESSAGE_TEXT, SimpleType.STRING); + } + + @Override + public Map getFields(CompositeDataSupport data) throws OpenDataException { + Map rc = super.getFields(data); + ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body")); + SimpleString value = buffer.readNullableSimpleString(); + rc.put(JMSCompositeDataConstants.MESSAGE_TEXT, value != null ? value.toString() : ""); + return rc; + } + + } + + static { + OPEN_TYPE_FACTORIES.put(Message.DEFAULT_TYPE, new MessageOpenTypeFactory()); + OPEN_TYPE_FACTORIES.put(Message.TEXT_TYPE, new TextMessageOpenTypeFactory()); + OPEN_TYPE_FACTORIES.put(Message.BYTES_TYPE, new ByteMessageOpenTypeFactory()); + OPEN_TYPE_FACTORIES.put(Message.MAP_TYPE, new MapMessageOpenTypeFactory()); + OPEN_TYPE_FACTORIES.put(Message.OBJECT_TYPE, new ObjectMessageOpenTypeFactory()); + OPEN_TYPE_FACTORIES.put(Message.STREAM_TYPE, new StreamMessageOpenTypeFactory()); + } + + private JMSOpenTypeSupport() { + } + + public static OpenTypeFactory getFactory(Byte type) throws OpenDataException { + return OPEN_TYPE_FACTORIES.get(type); + } + + public static CompositeData convert(CompositeDataSupport data) throws OpenDataException { + OpenTypeFactory f = getFactory((Byte) data.get("type")); + if (f == null) { + throw new OpenDataException("Cannot create a CompositeData for type: " + data.get("type")); + } + CompositeType ct = f.getCompositeType(); + Map fields = f.getFields(data); + return new CompositeDataSupport(ct, fields); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index fab9f6dfb8..019ebb38d7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -161,6 +161,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues"; + private static final String MANAGEMENT_BROWSE_PAGE_SIZE = "management-browse-page-size"; + private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections"; private static final String MAX_QUEUES_NODE_NAME = "max-queues"; @@ -778,6 +780,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { else if (AUTO_DELETE_JMS_QUEUES.equalsIgnoreCase(name)) { addressSettings.setAutoDeleteJmsQueues(XMLUtil.parseBoolean(child)); } + else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) { + addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child)); + } } return setting; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 26d22a0074..a606a0de29 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.management.impl; import javax.management.MBeanOperationInfo; +import javax.management.openmbean.CompositeData; import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.management.MessageCounterInfo; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; +import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport; import org.apache.activemq.artemis.core.messagecounter.MessageCounter; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -855,6 +857,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override + public CompositeData[] browse(String filterStr) throws Exception { + checkStarted(); + + clearIO(); + try { + int pageSize = addressSettingsRepository.getMatch(queue.getName().toString()).getManagementBrowsePageSize(); + int currentPageSize = 0; + ArrayList c = new ArrayList<>(); + Filter filter = FilterImpl.createFilter(filterStr); + queue.flushExecutor(); + LinkedListIterator iterator = queue.totalIterator(); + try { + while (iterator.hasNext() && currentPageSize++ < pageSize) { + MessageReference ref = iterator.next(); + if (filter == null || filter.match(ref.getMessage())) { + c.add(OpenTypeSupport.convert(ref)); + + } + } + CompositeData[] rc = new CompositeData[c.size()]; + c.toArray(rc); + return rc; + } + finally { + iterator.close(); + } + } + catch (ActiveMQException e) { + throw new IllegalStateException(e.getMessage()); + } + finally { + blockOnIO(); + } + } + @Override public void flushExecutor() { checkStarted(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java new file mode 100644 index 0000000000..d34165e0f1 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java @@ -0,0 +1,66 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl.openmbean; + +public interface CompositeDataConstants { + + String ADDRESS = "address"; + String MESSAGE_ID = "messageID"; + String USER_ID = "userID"; + String TYPE = "type"; + String DURABLE = "durable"; + String EXPIRATION = "expiration"; + String PRIORITY = "priority"; + String REDELIVERED = "redelivered"; + String TIMESTAMP = "timestamp"; + String BODY = "body"; + String PROPERTIES = "PropertiesText"; + + String ADDRESS_DESCRIPTION = "The Address"; + String MESSAGE_ID_DESCRIPTION = " The message ID"; + String USER_ID_DESCRIPTION = "The user ID"; + String TYPE_DESCRIPTION = "The message type"; + String DURABLE_DESCRIPTION = "Is the message durable"; + String EXPIRATION_DESCRIPTION = "The message expiration"; + String PRIORITY_DESCRIPTION = "The message priority"; + String REDELIVERED_DESCRIPTION = "Has the message been redelivered"; + String TIMESTAMP_DESCRIPTION = "The message timestamp"; + String BODY_DESCRIPTION = "The message body"; + String PROPERTIES_DESCRIPTION = "The properties text"; + + + // User properties + String STRING_PROPERTIES = "StringProperties"; + String BOOLEAN_PROPERTIES = "BooleanProperties"; + String BYTE_PROPERTIES = "ByteProperties"; + String SHORT_PROPERTIES = "ShortProperties"; + String INT_PROPERTIES = "IntProperties"; + String LONG_PROPERTIES = "LongProperties"; + String FLOAT_PROPERTIES = "FloatProperties"; + String DOUBLE_PROPERTIES = "DoubleProperties"; + + String STRING_PROPERTIES_DESCRIPTION = "User String Properties"; + String BOOLEAN_PROPERTIES_DESCRIPTION = "User Boolean Properties"; + String BYTE_PROPERTIES_DESCRIPTION = "User Byte Properties"; + String SHORT_PROPERTIES_DESCRIPTION = "User Short Properties"; + String INT_PROPERTIES_DESCRIPTION = "User Int Properties"; + String LONG_PROPERTIES_DESCRIPTION = "User Long Properties"; + String FLOAT_PROPERTIES_DESCRIPTION = "User Float Properties"; + String DOUBLE_PROPERTIES_DESCRIPTION = "User Double Properties"; + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java new file mode 100644 index 0000000000..50d9c999ac --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.management.impl.openmbean; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.MessageReference; + +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public final class OpenTypeSupport { + + private static MessageOpenTypeFactory FACTORY = new MessageOpenTypeFactory(); + + private OpenTypeSupport() { + } + + public static CompositeData convert(MessageReference ref) throws OpenDataException { + CompositeType ct = FACTORY.getCompositeType(); + Map fields = FACTORY.getFields(ref); + return new CompositeDataSupport(ct, fields); + } + + + static class MessageOpenTypeFactory { + private CompositeType compositeType; + private final List itemNamesList = new ArrayList(); + private final List itemDescriptionsList = new ArrayList(); + private final List itemTypesList = new ArrayList(); + + protected TabularType stringPropertyTabularType; + protected TabularType booleanPropertyTabularType; + protected TabularType bytePropertyTabularType; + protected TabularType shortPropertyTabularType; + protected TabularType intPropertyTabularType; + protected TabularType longPropertyTabularType; + protected TabularType floatPropertyTabularType; + protected TabularType doublePropertyTabularType; + + protected ArrayType body; + + protected String getTypeName() { + return Message.class.getName(); + } + + public CompositeType getCompositeType() throws OpenDataException { + if (compositeType == null) { + init(); + compositeType = createCompositeType(); + } + return compositeType; + } + + protected void init() throws OpenDataException { + + addItem(CompositeDataConstants.ADDRESS, CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.MESSAGE_ID, CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.USER_ID, CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE); + addItem(CompositeDataConstants.DURABLE, CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN); + addItem(CompositeDataConstants.EXPIRATION, CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG); + addItem(CompositeDataConstants.PRIORITY, CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE); + addItem(CompositeDataConstants.REDELIVERED, CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN); + addItem(CompositeDataConstants.TIMESTAMP, CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG); + + addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING); + + // now lets expose the type safe properties + stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING); + booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN); + bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE); + shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT); + intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER); + longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG); + floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT); + doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE); + + body = new ArrayType(SimpleType.BYTE, true); + + addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body); + + addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType); + addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType); + addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType); + addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType); + addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType); + addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType); + addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType); + addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType); + } + + public Map getFields(MessageReference ref) throws OpenDataException { + Map rc = new HashMap<>(); + Message m = ref.getMessage(); + rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID()); + if (m.getUserID() != null) { + rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString()); + } + else { + rc.put(CompositeDataConstants.USER_ID, ""); + } + rc.put(CompositeDataConstants.ADDRESS, m.getAddress().toString()); + rc.put(CompositeDataConstants.TYPE, m.getType()); + rc.put(CompositeDataConstants.DURABLE, m.isDurable()); + rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration()); + rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp()); + rc.put(CompositeDataConstants.PRIORITY, m.getPriority()); + rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1); + + ActiveMQBuffer bodyCopy = m.getBodyBufferCopy(); + byte[] bytes = new byte[bodyCopy.readableBytes()]; + bodyCopy.readBytes(bytes); + rc.put(CompositeDataConstants.BODY, bytes); + + Map propertyMap = m.toPropertyMap(); + + rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap); + + try { + rc.put(CompositeDataConstants.STRING_PROPERTIES, createTabularData(propertyMap, stringPropertyTabularType, String.class)); + } + catch (IOException e) { + rc.put(CompositeDataConstants.STRING_PROPERTIES, new TabularDataSupport(stringPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, createTabularData(propertyMap, booleanPropertyTabularType, Boolean.class)); + } + catch (IOException e) { + rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, new TabularDataSupport(booleanPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.BYTE_PROPERTIES, createTabularData(propertyMap, bytePropertyTabularType, Byte.class)); + } + catch (IOException e) { + rc.put(CompositeDataConstants.BYTE_PROPERTIES, new TabularDataSupport(bytePropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.SHORT_PROPERTIES, createTabularData(propertyMap, shortPropertyTabularType, Short.class)); + } + catch (IOException e) { + rc.put(CompositeDataConstants.SHORT_PROPERTIES, new TabularDataSupport(shortPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.INT_PROPERTIES, createTabularData(propertyMap, intPropertyTabularType, Integer.class)); + } + catch (IOException e) { + rc.put(CompositeDataConstants.INT_PROPERTIES, new TabularDataSupport(intPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.LONG_PROPERTIES, createTabularData(propertyMap, longPropertyTabularType, Long.class)); + } + catch (IOException e) { + rc.put(CompositeDataConstants.LONG_PROPERTIES, new TabularDataSupport(longPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.FLOAT_PROPERTIES, createTabularData(propertyMap, floatPropertyTabularType, Float.class)); + } + catch (IOException e) { + rc.put(CompositeDataConstants.FLOAT_PROPERTIES, new TabularDataSupport(floatPropertyTabularType)); + } + try { + rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, createTabularData(propertyMap, doublePropertyTabularType, Double.class)); + } + catch (IOException e) { + rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, new TabularDataSupport(doublePropertyTabularType)); + } + return rc; + } + + protected String toString(Object value) { + if (value == null) { + return null; + } + return value.toString(); + } + + protected CompositeType createCompositeType() throws OpenDataException { + String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]); + String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]); + OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]); + return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes); + } + + protected String getDescription() { + return getTypeName(); + } + + protected TabularType createTabularType(Class type, OpenType openType) throws OpenDataException { + String typeName = "java.util.Map"; + String[] keyValue = new String[]{"key", "value"}; + OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType}; + CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes); + return new TabularType(typeName, typeName, rowType, new String[]{"key"}); + } + + protected TabularDataSupport createTabularData(Map entries, TabularType type, Class valueType) throws IOException, OpenDataException { + TabularDataSupport answer = new TabularDataSupport(type); + + for (String key : entries.keySet()) { + Object value = entries.get(key); + if (valueType.isInstance(value)) { + CompositeDataSupport compositeData = createTabularRowValue(type, key, value); + answer.put(compositeData); + } + else if (valueType == String.class && value instanceof SimpleString) { + CompositeDataSupport compositeData = createTabularRowValue(type, key, value.toString()); + answer.put(compositeData); + } + } + return answer; + } + + protected CompositeDataSupport createTabularRowValue(TabularType type, String key, Object value) throws OpenDataException { + Map fields = new HashMap(); + fields.put("key", key); + fields.put("value", value); + return new CompositeDataSupport(type.getRowType(), fields); + } + + + protected void addItem(String name, String description, OpenType type) { + itemNamesList.add(name); + itemDescriptionsList.add(description); + itemTypesList.add(type); + } + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 313dabf8ce..3309fab22a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -66,6 +66,8 @@ public class AddressSettings implements Mergeable, Serializable public static final long DEFAULT_SLOW_CONSUMER_CHECK_PERIOD = 5; + public static final int MANAGEMENT_BROWSE_PAGE_SIZE = 200; + public static final SlowConsumerPolicy DEFAULT_SLOW_CONSUMER_POLICY = SlowConsumerPolicy.NOTIFY; private AddressFullMessagePolicy addressFullMessagePolicy = null; @@ -110,6 +112,8 @@ public class AddressSettings implements Mergeable, Serializable private Boolean autoDeleteJmsQueues = null; + private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE; + public AddressSettings(AddressSettings other) { this.addressFullMessagePolicy = other.addressFullMessagePolicy; this.maxSizeBytes = other.maxSizeBytes; @@ -132,6 +136,7 @@ public class AddressSettings implements Mergeable, Serializable this.slowConsumerPolicy = other.slowConsumerPolicy; this.autoCreateJmsQueues = other.autoCreateJmsQueues; this.autoDeleteJmsQueues = other.autoDeleteJmsQueues; + this.managementBrowsePageSize = other.managementBrowsePageSize; } public AddressSettings() { @@ -319,6 +324,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public int getManagementBrowsePageSize() { + return managementBrowsePageSize != null ? managementBrowsePageSize : AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE; + } + + public AddressSettings setManagementBrowsePageSize(int managementBrowsePageSize) { + this.managementBrowsePageSize = managementBrowsePageSize; + return this; + } + /** * merge 2 objects in to 1 * @@ -386,6 +400,9 @@ public class AddressSettings implements Mergeable, Serializable if (autoDeleteJmsQueues == null) { autoDeleteJmsQueues = merged.autoDeleteJmsQueues; } + if (managementBrowsePageSize == null) { + managementBrowsePageSize = merged.managementBrowsePageSize; + } } @Override @@ -445,6 +462,8 @@ public class AddressSettings implements Mergeable, Serializable autoCreateJmsQueues = BufferHelper.readNullableBoolean(buffer); autoDeleteJmsQueues = BufferHelper.readNullableBoolean(buffer); + + managementBrowsePageSize = BufferHelper.readNullableInteger(buffer); } @Override @@ -470,7 +489,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableLong(slowConsumerThreshold) + BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null) + BufferHelper.sizeOfNullableBoolean(autoCreateJmsQueues) + - BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues); + BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) + + BufferHelper.sizeOfNullableInteger(managementBrowsePageSize); } @Override @@ -516,6 +536,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableBoolean(buffer, autoCreateJmsQueues); BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsQueues); + + BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize); } /* (non-Javadoc) @@ -546,6 +568,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode()); result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode()); result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode()); + result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode()); return result; } @@ -687,6 +710,14 @@ public class AddressSettings implements Mergeable, Serializable } else if (!autoDeleteJmsQueues.equals(other.autoDeleteJmsQueues)) return false; + else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize)) + return false; + if (managementBrowsePageSize == null) { + if (other.managementBrowsePageSize != null) + return false; + } + else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize)) + return false; return true; } @@ -736,6 +767,8 @@ public class AddressSettings implements Mergeable, Serializable autoCreateJmsQueues + ", autoDeleteJmsQueues=" + autoDeleteJmsQueues + + ", managementBrowsePageSize=" + + managementBrowsePageSize + "]"; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 09350ae5ae..59b0491051 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2189,6 +2189,16 @@ + + + + + how many message a management resource can browse + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 3f726f00cb..2448b29538 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -834,6 +834,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { return null; } + @Override + public Map toPropertyMap() { + return null; + } + @Override public FakeMessage writeBodyBufferBytes(byte[] bytes) { return this; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index f6f68bf495..6067f94658 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -691,6 +691,11 @@ public class AcknowledgeTest extends ActiveMQTestBase { return null; } + @Override + public Map toPropertyMap() { + return null; + } + @Override public FakeMessageWithID writeBodyBufferBytes(byte[] bytes) { return this; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java index fbac0884de..d86e0bd7aa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java @@ -16,15 +16,19 @@ */ package org.apache.activemq.artemis.tests.integration.jms.server.management; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.management.Notification; +import javax.management.openmbean.CompositeData; import javax.naming.Context; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -56,6 +60,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQQueue; import org.apache.activemq.artemis.jms.client.ActiveMQTopic; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.jms.server.management.JMSNotificationType; +import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; import org.apache.activemq.artemis.tests.integration.management.ManagementTestBase; import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext; @@ -150,6 +155,211 @@ public class JMSQueueControlTest extends ManagementTestBase { Assert.assertEquals(0, data.length); } + @Test + public void testBrowseMessagesWithNullFilter() throws Exception { + JMSQueueControl queueControl = createManagementControl(); + + Assert.assertEquals(0, getMessageCount(queueControl)); + + String[] ids = JMSUtil.sendMessages(queue, 2); + + Assert.assertEquals(2, getMessageCount(queueControl)); + + CompositeData[] data = queueControl.browse(); + Assert.assertEquals(2, data.length); + System.out.println(data[0]); + Assert.assertEquals(ids[0], data[0].get("JMSMessageID").toString()); + Assert.assertEquals(ids[1], data[1].get("JMSMessageID").toString()); + + JMSUtil.consumeMessages(2, queue); + + data = queueControl.browse(); + Assert.assertEquals(0, data.length); + } + + @Test + public void testBrowseMessagesWithNullFilterReplyTo() throws Exception { + JMSQueueControl queueControl = createManagementControl(); + + Assert.assertEquals(0, getMessageCount(queueControl)); + Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName()); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Message m = JMSUtil.sendMessageWithReplyTo(session, queue, "foo"); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + CompositeData[] data = queueControl.browse(); + Assert.assertEquals(1, data.length); + Assert.assertEquals(1, data.length); + Assert.assertNotNull(data[0].get("JMSReplyTo")); + Assert.assertEquals("jms.queue.foo", data[0].get("JMSReplyTo")); + System.out.println(data[0]); + + JMSUtil.consumeMessages(1, queue); + + data = queueControl.browse(); + Assert.assertEquals(0, data.length); + connection.close(); + } + + @Test + public void testBrowseMessagesWithAllProps() throws Exception { + JMSQueueControl queueControl = createManagementControl(); + + Assert.assertEquals(0, getMessageCount(queueControl)); + Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName()); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + JMSUtil.sendMessageWithProperty(session, queue, MessageUtil.CORRELATIONID_HEADER_NAME.toString(), "foo"); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + CompositeData[] data = queueControl.browse(); + Assert.assertEquals(1, data.length); + Assert.assertNotNull(data[0].get("JMSCorrelationID")); + Assert.assertEquals("foo", data[0].get("JMSCorrelationID")); + System.out.println(data[0]); + + JMSUtil.consumeMessages(1, queue); + + JMSUtil.sendMessageWithProperty(session, queue, MessageUtil.JMSXGROUPID.toString(), "myGroupID"); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + data = queueControl.browse(); + Assert.assertEquals(1, data.length); + Assert.assertNotNull(data[0].get("JMSXGroupID")); + Assert.assertEquals("myGroupID", data[0].get("JMSXGroupID")); + System.out.println(data[0]); + + JMSUtil.consumeMessages(1, queue); + + JMSUtil.sendMessageWithProperty(session, queue, "JMSXGroupSeq", 33); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + data = queueControl.browse(); + Assert.assertEquals(1, data.length); + Assert.assertNotNull(data[0].get("JMSXGroupID")); + Assert.assertEquals(33, data[0].get("JMSXGroupSeq")); + System.out.println(data[0]); + + JMSUtil.consumeMessages(1, queue); + + JMSUtil.sendMessageWithProperty(session, queue, "JMSXUserID", "theheadhonch"); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + data = queueControl.browse(); + Assert.assertEquals(1, data.length); + Assert.assertNotNull(data[0].get("JMSXUserID")); + Assert.assertEquals("theheadhonch", data[0].get("JMSXUserID")); + System.out.println(data[0]); + + JMSUtil.consumeMessages(1, queue); + + data = queueControl.browse(); + Assert.assertEquals(0, data.length); + connection.close(); + } + + @Test + public void testBrowseBytesMessages() throws Exception { + JMSQueueControl queueControl = createManagementControl(); + + Assert.assertEquals(0, getMessageCount(queueControl)); + + ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName())); + + Connection conn = cf.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + byte[] bytes = RandomUtil.randomBytes(201); + + BytesMessage message = JMSUtil.sendByteMessage(session, queue, bytes); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + CompositeData[] data = queueControl.browse(); + Assert.assertEquals(1, data.length); + Assert.assertNotNull(data[0].get("BodyLength")); + Assert.assertEquals(201L, data[0].get("BodyLength")); + Assert.assertNotNull(data[0].get("BodyPreview")); + Assert.assertArrayEquals(message.getBody(byte[].class), (byte[]) data[0].get("BodyPreview")); + System.out.println(data[0]); + + JMSUtil.consumeMessages(1, queue); + + bytes = RandomUtil.randomBytes(301); + + message = JMSUtil.sendByteMessage(session, queue, bytes); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + data = queueControl.browse(); + Assert.assertEquals(1, data.length); + Assert.assertNotNull(data[0].get("BodyLength")); + Assert.assertEquals(301L, data[0].get("BodyLength")); + Assert.assertNotNull(data[0].get("BodyPreview")); + byte[] body = message.getBody(byte[].class); + Assert.assertArrayEquals(Arrays.copyOf(body, 255), (byte[]) data[0].get("BodyPreview")); + System.out.println(data[0]); + JMSUtil.consumeMessages(1, queue); + + data = queueControl.browse(); + Assert.assertEquals(0, data.length); + conn.close(); + } + + @Test + public void testBrowseMapMessages() throws Exception { + JMSQueueControl queueControl = createManagementControl(); + + Assert.assertEquals(0, getMessageCount(queueControl)); + + ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName())); + + Connection conn = cf.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + MapMessage message = session.createMapMessage(); + message.setString("stringP", "aStringP"); + message.setBoolean("booleanP", true); + message.setByte("byteP", (byte) 1); + message.setChar("charP", 'q'); + message.setDouble("doubleP", 3.2); + message.setFloat("floatP", 4.5F); + message.setInt("intP", 8); + message.setLong("longP", 7); + message.setShort("shortP", (short) 777); + producer.send(message); + + Assert.assertEquals(1, getMessageCount(queueControl)); + + CompositeData[] data = queueControl.browse(); + Assert.assertEquals(1, data.length); + String contentMap = (String) data[0].get("ContentMap"); + Assert.assertNotNull(contentMap); + Assert.assertTrue(contentMap.contains("intP=8")); + Assert.assertTrue(contentMap.contains("floatP=4.5")); + Assert.assertTrue(contentMap.contains("longP=7")); + Assert.assertTrue(contentMap.contains("charP=q")); + Assert.assertTrue(contentMap.contains("byteP=1")); + Assert.assertTrue(contentMap.contains("doubleP=3.2")); + Assert.assertTrue(contentMap.contains("stringP=aStringP")); + Assert.assertTrue(contentMap.contains("booleanP=true")); + Assert.assertTrue(contentMap.contains("shortP=777")); + System.out.println(data[0]); + + JMSUtil.consumeMessages(1, queue); + + data = queueControl.browse(); + Assert.assertEquals(0, data.length); + conn.close(); + } + @Test public void testListDeliveringMessages() throws Exception { JMSQueueControl queueControl = createManagementControl(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java index a17e8955d5..a4aec4a092 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java @@ -32,6 +32,7 @@ import org.junit.Test; import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.Session; +import javax.management.openmbean.CompositeData; import java.util.Map; /** @@ -317,6 +318,21 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest { proxy.invokeOperation("resume"); } + @Override + public CompositeData[] browse() throws Exception { + Map map = (Map) proxy.invokeOperation("browse"); + CompositeData[] compositeDatas = (CompositeData[]) map.get(CompositeData.class.getName()); + if (compositeDatas == null) { + compositeDatas = new CompositeData[0]; + } + return compositeDatas; + } + + @Override + public CompositeData[] browse(String filter) throws Exception { + return new CompositeData[0]; + } + @Override public String getSelector() { return (String) proxy.retrieveAttributeValue("selector"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSUtil.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSUtil.java index 6b15b23872..8bbcc1630b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSUtil.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSUtil.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.jms.server.management; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -156,6 +157,48 @@ public class JMSUtil { return message; } + public static BytesMessage sendByteMessage(final Session session, + final Destination destination, + final byte[] bytes) throws JMSException { + MessageProducer producer = session.createProducer(destination); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(bytes); + producer.send(message); + return message; + } + + public static Message sendMessageWithProperty(final Session session, + final Destination destination, + final String key, + final int value) throws JMSException { + MessageProducer producer = session.createProducer(destination); + Message message = session.createMessage(); + message.setIntProperty(key, value); + producer.send(message); + return message; + } + + public static Message sendMessageWithProperty(final Session session, + final Destination destination, + final String key, + final String value) throws JMSException { + MessageProducer producer = session.createProducer(destination); + Message message = session.createMessage(); + message.setStringProperty(key, value); + producer.send(message); + return message; + } + + public static Message sendMessageWithReplyTo(final Session session, + final Destination destination, + final String replyTo) throws JMSException { + MessageProducer producer = session.createProducer(destination); + Message message = session.createMessage(); + message.setJMSReplyTo(ActiveMQJMSClient.createQueue(replyTo)); + producer.send(message); + return message; + } + public static void consumeMessages(final int expected, final Destination dest) throws JMSException { Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName()); try { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 6dce64bd79..15532559de 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -27,6 +27,8 @@ import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.junit.Before; +import javax.management.openmbean.CompositeData; + public class QueueControlUsingCoreTest extends QueueControlTest { protected ClientSession session; @@ -328,6 +330,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Boolean) proxy.invokeOperation("isPaused"); } + @Override + public CompositeData[] browse(String filter) throws Exception { + return null; + } + @Override public String listConsumersAsJSON() throws Exception { return (String) proxy.invokeOperation("listConsumersAsJSON");