This closes #310
This commit is contained in:
commit
bb40472f97
|
@ -579,4 +579,10 @@ public interface Message {
|
|||
* @return Returns the message in Map form, useful when encoding to JSON
|
||||
*/
|
||||
Map<String, Object> toMap();
|
||||
|
||||
|
||||
/**
|
||||
* @return Returns the message properties in Map form, useful when encoding to JSON
|
||||
*/
|
||||
Map<String, Object> toPropertyMap();
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.api.core.management;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
@ -24,9 +26,13 @@ import java.util.Map;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||
import org.apache.activemq.artemis.utils.Base64;
|
||||
import org.apache.activemq.artemis.utils.json.JSONArray;
|
||||
import org.apache.activemq.artemis.utils.json.JSONObject;
|
||||
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
|
||||
/**
|
||||
* Helper class to use ActiveMQ Artemis Core messages to manage server resources.
|
||||
*/
|
||||
|
@ -178,7 +184,20 @@ public final class ManagementHelper {
|
|||
if (clz.isArray()) {
|
||||
Object[] innerArray = (Object[]) parameter;
|
||||
|
||||
jsonArray.put(ManagementHelper.toJSONArray(innerArray));
|
||||
if (innerArray instanceof CompositeData[]) {
|
||||
JSONArray jsonArray1 = new JSONArray();
|
||||
for (Object data : innerArray) {
|
||||
String s = Base64.encodeObject((CompositeDataSupport)data);
|
||||
jsonArray1.put(s);
|
||||
}
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put(CompositeData.class.getName(), jsonArray1);
|
||||
jsonArray.put(jsonObject);
|
||||
System.out.println("ManagementHelper.toJSONArray");
|
||||
}
|
||||
else {
|
||||
jsonArray.put(ManagementHelper.toJSONArray(innerArray));
|
||||
}
|
||||
}
|
||||
else {
|
||||
ManagementHelper.checkType(parameter);
|
||||
|
@ -235,6 +254,16 @@ public final class ManagementHelper {
|
|||
innerVal = ((Integer) innerVal).longValue();
|
||||
}
|
||||
|
||||
if (CompositeData.class.getName().equals(key)) {
|
||||
Object[] data = (Object[]) innerVal;
|
||||
CompositeData[] cds = new CompositeData[data.length];
|
||||
for (int i1 = 0; i1 < data.length; i1++) {
|
||||
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(Base64.decode((data[i1].toString()))));
|
||||
cds[i1] = (CompositeDataSupport) ois.readObject();
|
||||
}
|
||||
innerVal = cds;
|
||||
}
|
||||
|
||||
map.put(key, innerVal);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.api.core.management;
|
||||
|
||||
import javax.management.MBeanOperationInfo;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -377,6 +378,12 @@ public interface QueueControl {
|
|||
@Operation(desc = "Inspects if the queue is paused", impact = MBeanOperationInfo.INFO)
|
||||
boolean isPaused() throws Exception;
|
||||
|
||||
/**
|
||||
* Resets the MessagesAdded property
|
||||
*/
|
||||
@Operation(desc = "Browse Messages", impact = MBeanOperationInfo.ACTION)
|
||||
CompositeData[] browse(String filter) throws Exception;
|
||||
|
||||
/**
|
||||
* Resets the MessagesAdded property
|
||||
*/
|
||||
|
|
|
@ -420,6 +420,13 @@ public abstract class MessageImpl implements MessageInternal {
|
|||
map.put("expiration", expiration);
|
||||
map.put("timestamp", timestamp);
|
||||
map.put("priority", priority);
|
||||
map.putAll(toPropertyMap());
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> toPropertyMap() {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
for (SimpleString propName : properties.getPropertyNames()) {
|
||||
map.put(propName.toString(), properties.getProperty(propName));
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.jms.management;
|
|||
import java.util.Map;
|
||||
|
||||
import javax.management.MBeanOperationInfo;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.Operation;
|
||||
import org.apache.activemq.artemis.api.core.management.Parameter;
|
||||
|
@ -297,6 +298,19 @@ public interface JMSQueueControl extends DestinationControl {
|
|||
@Operation(desc = "Resume the queue.", impact = MBeanOperationInfo.ACTION)
|
||||
void resume() throws Exception;
|
||||
|
||||
|
||||
/**
|
||||
* Resumes the queue. Messages are again delivered to its consumers.
|
||||
*/
|
||||
@Operation(desc = "Browse the queue.", impact = MBeanOperationInfo.ACTION)
|
||||
CompositeData[] browse() throws Exception;
|
||||
|
||||
/**
|
||||
* Resumes the queue. Messages are again delivered to its consumers.
|
||||
*/
|
||||
@Operation(desc = "Browse the queue.", impact = MBeanOperationInfo.ACTION)
|
||||
CompositeData[] browse(String filter) throws Exception;
|
||||
|
||||
@Operation(desc = "List all the existent consumers on the Queue")
|
||||
String listConsumersAsJSON() throws Exception;
|
||||
|
||||
|
|
|
@ -25,6 +25,8 @@ import javax.jms.JMSRuntimeException;
|
|||
import javax.jms.Message;
|
||||
import javax.jms.MessageFormatException;
|
||||
import javax.jms.MessageNotWriteableException;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import java.io.InputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.OutputStream;
|
||||
|
@ -93,6 +95,13 @@ public class ActiveMQMessage implements javax.jms.Message {
|
|||
return jmsMessage;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static CompositeData coreCompositeTypeToJMSCompositeType(CompositeDataSupport data) throws Exception {
|
||||
CompositeData jmsdata = new CompositeDataSupport(data.getCompositeType(), new HashMap<String, Object>());
|
||||
return jmsdata;
|
||||
}
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
private static final HashSet<String> reservedIdentifiers = new HashSet<>();
|
||||
|
|
|
@ -16,12 +16,17 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.jms.management.impl;
|
||||
|
||||
import javax.jms.InvalidSelectorException;
|
||||
import javax.management.MBeanInfo;
|
||||
import javax.management.StandardMBean;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException;
|
||||
import org.apache.activemq.artemis.api.core.FilterConstants;
|
||||
import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
|
||||
import org.apache.activemq.artemis.api.core.management.Operation;
|
||||
|
@ -33,6 +38,7 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper
|
|||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||
import org.apache.activemq.artemis.jms.client.SelectorTranslator;
|
||||
import org.apache.activemq.artemis.jms.management.impl.openmbean.JMSOpenTypeSupport;
|
||||
import org.apache.activemq.artemis.jms.server.JMSServerManager;
|
||||
import org.apache.activemq.artemis.utils.json.JSONArray;
|
||||
import org.apache.activemq.artemis.utils.json.JSONObject;
|
||||
|
@ -205,6 +211,10 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
|
|||
return jmsMessages;
|
||||
}
|
||||
|
||||
private CompositeData toJMSCompositeType(CompositeDataSupport data) throws Exception {
|
||||
return JMSOpenTypeSupport.convert(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object>[] listScheduledMessages() throws Exception {
|
||||
Map<String, Object>[] coreMessages = coreQueueControl.listScheduledMessages();
|
||||
|
@ -405,6 +415,30 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
|
|||
coreQueueControl.resume();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompositeData[] browse() throws Exception {
|
||||
return browse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompositeData[] browse(String filter) throws Exception {
|
||||
try {
|
||||
CompositeData[] messages = coreQueueControl.browse(filter);
|
||||
|
||||
ArrayList<CompositeData> c = new ArrayList<>();
|
||||
|
||||
for (CompositeData message : messages) {
|
||||
c.add(toJMSCompositeType((CompositeDataSupport) message));
|
||||
}
|
||||
CompositeData[] rc = new CompositeData[c.size()];
|
||||
c.toArray(rc);
|
||||
return rc;
|
||||
}
|
||||
catch (ActiveMQInvalidFilterExpressionException e) {
|
||||
throw new InvalidSelectorException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSelector() {
|
||||
return coreQueueControl.getFilter();
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jms.management.impl.openmbean;
|
||||
|
||||
public interface JMSCompositeDataConstants {
|
||||
String JMS_DESTINATION = "JMSDestination";
|
||||
String JMS_MESSAGE_ID = "JMSMessageID";
|
||||
String JMS_TYPE = "JMSType";
|
||||
String JMS_DELIVERY_MODE = "JMSDeliveryMode";
|
||||
String JMS_EXPIRATION = "JMSExpiration";
|
||||
String JMS_PRIORITY = "JMSPriority";
|
||||
String JMS_REDELIVERED = "JMSRedelivered";
|
||||
String JMS_TIMESTAMP = "JMSTimestamp";
|
||||
String JMSXGROUP_SEQ = "JMSXGroupSeq";
|
||||
String JMSXGROUP_ID = "JMSXGroupID";
|
||||
String JMSXUSER_ID = "JMSXUserID";
|
||||
String JMS_CORRELATION_ID = "JMSCorrelationID";
|
||||
String ORIGINAL_DESTINATION = "OriginalDestination";
|
||||
String JMS_REPLY_TO = "JMSReplyTo";
|
||||
|
||||
String JMS_DESTINATION_DESCRIPTION = "The message destination";
|
||||
String JMS_MESSAGE_ID_DESCRIPTION = "The message ID";
|
||||
String JMS_TYPE_DESCRIPTION = "The message type";
|
||||
String JMS_DELIVERY_MODE_DESCRIPTION = "The message delivery mode";
|
||||
String JMS_EXPIRATION_DESCRIPTION = "The message expiration";
|
||||
String JMS_PRIORITY_DESCRIPTION = "The message priority";
|
||||
String JMS_REDELIVERED_DESCRIPTION = "Is the message redelivered";
|
||||
String JMS_TIMESTAMP_DESCRIPTION = "The message timestamp";
|
||||
String JMSXGROUP_SEQ_DESCRIPTION = "The message group sequence number";
|
||||
String JMSXGROUP_ID_DESCRIPTION = "The message group ID";
|
||||
String JMSXUSER_ID_DESCRIPTION = "The user that sent the message";
|
||||
String JMS_CORRELATION_ID_DESCRIPTION = "The message correlation ID";
|
||||
String ORIGINAL_DESTINATION_DESCRIPTION = "Original Destination Before Senting To DLQ";
|
||||
String JMS_REPLY_TO_DESCRIPTION = "The reply to address";
|
||||
|
||||
String BODY_LENGTH = "BodyLength";
|
||||
String BODY_PREVIEW = "BodyPreview";
|
||||
String CONTENT_MAP = "ContentMap";
|
||||
String MESSAGE_TEXT = "Text";
|
||||
String MESSAGE_URL = "Url";
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,353 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jms.management.impl.openmbean;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants;
|
||||
import org.apache.activemq.artemis.reader.MapMessageUtil;
|
||||
import org.apache.activemq.artemis.utils.TypedProperties;
|
||||
|
||||
import javax.management.openmbean.ArrayType;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import javax.management.openmbean.CompositeType;
|
||||
import javax.management.openmbean.OpenDataException;
|
||||
import javax.management.openmbean.OpenType;
|
||||
import javax.management.openmbean.SimpleType;
|
||||
import javax.management.openmbean.TabularDataSupport;
|
||||
import javax.management.openmbean.TabularType;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public final class JMSOpenTypeSupport {
|
||||
|
||||
public interface OpenTypeFactory {
|
||||
CompositeType getCompositeType() throws OpenDataException;
|
||||
|
||||
Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException;
|
||||
}
|
||||
|
||||
private static final Map<Byte, AbstractOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<>();
|
||||
|
||||
public abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
|
||||
|
||||
private CompositeType compositeType;
|
||||
private final List<String> itemNamesList = new ArrayList<String>();
|
||||
private final List<String> itemDescriptionsList = new ArrayList<String>();
|
||||
private final List<OpenType> itemTypesList = new ArrayList<OpenType>();
|
||||
|
||||
public CompositeType getCompositeType() throws OpenDataException {
|
||||
if (compositeType == null) {
|
||||
init();
|
||||
compositeType = createCompositeType();
|
||||
}
|
||||
return compositeType;
|
||||
}
|
||||
|
||||
protected void init() throws OpenDataException {
|
||||
}
|
||||
|
||||
protected CompositeType createCompositeType() throws OpenDataException {
|
||||
String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]);
|
||||
String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
|
||||
OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]);
|
||||
return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes);
|
||||
}
|
||||
|
||||
protected abstract String getTypeName();
|
||||
|
||||
protected void addItem(String name, String description, OpenType type) {
|
||||
itemNamesList.add(name);
|
||||
itemDescriptionsList.add(description);
|
||||
itemTypesList.add(type);
|
||||
}
|
||||
|
||||
protected String getDescription() {
|
||||
return getTypeName();
|
||||
}
|
||||
|
||||
public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
|
||||
Map<String, Object> rc = new HashMap<String, Object>();
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
static class MessageOpenTypeFactory extends AbstractOpenTypeFactory {
|
||||
protected TabularType stringPropertyTabularType;
|
||||
protected TabularType booleanPropertyTabularType;
|
||||
protected TabularType bytePropertyTabularType;
|
||||
protected TabularType shortPropertyTabularType;
|
||||
protected TabularType intPropertyTabularType;
|
||||
protected TabularType longPropertyTabularType;
|
||||
protected TabularType floatPropertyTabularType;
|
||||
protected TabularType doublePropertyTabularType;
|
||||
|
||||
protected ArrayType body;
|
||||
|
||||
@Override
|
||||
protected String getTypeName() {
|
||||
return Message.class.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init() throws OpenDataException {
|
||||
super.init();
|
||||
|
||||
addItem(JMSCompositeDataConstants.JMS_DESTINATION, JMSCompositeDataConstants.JMS_DESTINATION_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(JMSCompositeDataConstants.JMS_MESSAGE_ID, JMSCompositeDataConstants.JMS_MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(JMSCompositeDataConstants.JMS_CORRELATION_ID, JMSCompositeDataConstants.JMS_CORRELATION_ID_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(JMSCompositeDataConstants.JMS_TYPE, JMSCompositeDataConstants.JMS_TYPE_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(JMSCompositeDataConstants.JMS_DELIVERY_MODE, JMSCompositeDataConstants.JMS_DELIVERY_MODE_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(JMSCompositeDataConstants.JMS_EXPIRATION, JMSCompositeDataConstants.JMS_EXPIRATION_DESCRIPTION, SimpleType.LONG);
|
||||
addItem(JMSCompositeDataConstants.JMS_PRIORITY, JMSCompositeDataConstants.JMS_PRIORITY_DESCRIPTION, SimpleType.INTEGER);
|
||||
addItem(JMSCompositeDataConstants.JMS_REDELIVERED, JMSCompositeDataConstants.JMS_REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN);
|
||||
addItem(JMSCompositeDataConstants.JMS_TIMESTAMP, JMSCompositeDataConstants.JMS_TIMESTAMP_DESCRIPTION, SimpleType.DATE);
|
||||
addItem(JMSCompositeDataConstants.JMSXGROUP_ID, JMSCompositeDataConstants.JMSXGROUP_ID_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(JMSCompositeDataConstants.JMSXGROUP_SEQ, JMSCompositeDataConstants.JMSXGROUP_SEQ_DESCRIPTION, SimpleType.INTEGER);
|
||||
addItem(JMSCompositeDataConstants.JMSXUSER_ID, JMSCompositeDataConstants.JMSXUSER_ID_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(JMSCompositeDataConstants.JMS_REPLY_TO, JMSCompositeDataConstants.JMS_REPLY_TO_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(JMSCompositeDataConstants.ORIGINAL_DESTINATION, JMSCompositeDataConstants.ORIGINAL_DESTINATION_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING);
|
||||
|
||||
// now lets expose the type safe properties
|
||||
stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING);
|
||||
booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN);
|
||||
bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE);
|
||||
shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT);
|
||||
intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER);
|
||||
longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG);
|
||||
floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT);
|
||||
doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE);
|
||||
|
||||
addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType);
|
||||
addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType);
|
||||
addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
|
||||
addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType);
|
||||
addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType);
|
||||
addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
|
||||
addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
|
||||
addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(data);
|
||||
putString(rc, data, JMSCompositeDataConstants.JMS_MESSAGE_ID, CompositeDataConstants.USER_ID);
|
||||
putString(rc, data, JMSCompositeDataConstants.JMS_DESTINATION, CompositeDataConstants.ADDRESS);
|
||||
putStringProperty(rc, data, JMSCompositeDataConstants.JMS_REPLY_TO, "JMSReplyTo");
|
||||
rc.put(JMSCompositeDataConstants.JMS_TYPE, getType());
|
||||
rc.put(JMSCompositeDataConstants.JMS_DELIVERY_MODE, ((Boolean)data.get(CompositeDataConstants.DURABLE)) ? "PERSISTENT" : "NON-PERSISTENT");
|
||||
rc.put(JMSCompositeDataConstants.JMS_EXPIRATION, data.get(CompositeDataConstants.EXPIRATION));
|
||||
rc.put(JMSCompositeDataConstants.JMS_TIMESTAMP, new Date((Long) data.get(CompositeDataConstants.TIMESTAMP)));
|
||||
rc.put(JMSCompositeDataConstants.JMS_PRIORITY, ((Byte) data.get(CompositeDataConstants.PRIORITY)).intValue());
|
||||
putStringProperty(rc, data, JMSCompositeDataConstants.JMS_CORRELATION_ID, JMSCompositeDataConstants.JMS_CORRELATION_ID);
|
||||
rc.put(JMSCompositeDataConstants.JMS_REDELIVERED, data.get(CompositeDataConstants.REDELIVERED));
|
||||
putStringProperty(rc, data, JMSCompositeDataConstants.JMSXGROUP_ID, Message.HDR_GROUP_ID.toString());
|
||||
putIntProperty(rc, data, JMSCompositeDataConstants.JMSXGROUP_SEQ, JMSCompositeDataConstants.JMSXGROUP_SEQ);
|
||||
putStringProperty(rc, data, JMSCompositeDataConstants.JMSXUSER_ID, JMSCompositeDataConstants.JMSXUSER_ID);
|
||||
putStringProperty(rc, data, JMSCompositeDataConstants.ORIGINAL_DESTINATION, Message.HDR_ORIGINAL_ADDRESS.toString());
|
||||
|
||||
rc.put(CompositeDataConstants.PROPERTIES, "" + data.get(CompositeDataConstants.PROPERTIES));
|
||||
|
||||
rc.put(CompositeDataConstants.STRING_PROPERTIES, data.get(CompositeDataConstants.STRING_PROPERTIES));
|
||||
rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, data.get(CompositeDataConstants.BOOLEAN_PROPERTIES));
|
||||
rc.put(CompositeDataConstants.BYTE_PROPERTIES, data.get(CompositeDataConstants.BYTE_PROPERTIES));
|
||||
rc.put(CompositeDataConstants.SHORT_PROPERTIES, data.get(CompositeDataConstants.SHORT_PROPERTIES));
|
||||
rc.put(CompositeDataConstants.INT_PROPERTIES, data.get(CompositeDataConstants.INT_PROPERTIES));
|
||||
rc.put(CompositeDataConstants.LONG_PROPERTIES, data.get(CompositeDataConstants.LONG_PROPERTIES));
|
||||
rc.put(CompositeDataConstants.FLOAT_PROPERTIES, data.get(CompositeDataConstants.FLOAT_PROPERTIES));
|
||||
rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, data.get(CompositeDataConstants.DOUBLE_PROPERTIES));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
private void putString(Map<String, Object> rc, CompositeDataSupport data, String target, String source) {
|
||||
String prop = (String) data.get(source);
|
||||
if (prop != null) {
|
||||
rc.put(target, prop);
|
||||
}
|
||||
else {
|
||||
rc.put(target, "");
|
||||
}
|
||||
}
|
||||
|
||||
private void putStringProperty(Map<String, Object> rc, CompositeDataSupport data, String target, String source) {
|
||||
TabularDataSupport properties = (TabularDataSupport) data.get(CompositeDataConstants.STRING_PROPERTIES);
|
||||
Object[] keys = new Object[]{source};
|
||||
CompositeDataSupport cds = (CompositeDataSupport) properties.get(keys);
|
||||
String prop = "";
|
||||
if (cds != null && cds.get("value") != null) {
|
||||
prop = (String) cds.get("value");
|
||||
}
|
||||
rc.put(target, prop);
|
||||
}
|
||||
|
||||
private void putIntProperty(Map<String, Object> rc, CompositeDataSupport data, String target, String source) {
|
||||
TabularDataSupport properties = (TabularDataSupport) data.get(CompositeDataConstants.INT_PROPERTIES);
|
||||
Object[] keys = new Object[]{source};
|
||||
CompositeDataSupport cds = (CompositeDataSupport) properties.get(keys);
|
||||
Integer prop = 0;
|
||||
if (cds != null && cds.get("value") != null) {
|
||||
prop = (Integer) cds.get("value");
|
||||
}
|
||||
rc.put(target, prop);
|
||||
}
|
||||
|
||||
private String getType() {
|
||||
return "Message";
|
||||
}
|
||||
|
||||
protected String toString(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
return value.toString();
|
||||
}
|
||||
|
||||
|
||||
protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException {
|
||||
String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">";
|
||||
String[] keyValue = new String[]{"key", "value"};
|
||||
OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
|
||||
CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes);
|
||||
return new TabularType(typeName, typeName, rowType, new String[]{"key"});
|
||||
}
|
||||
}
|
||||
|
||||
static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||
|
||||
|
||||
@Override
|
||||
protected String getTypeName() {
|
||||
return "BytesMessage";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init() throws OpenDataException {
|
||||
super.init();
|
||||
addItem(JMSCompositeDataConstants.BODY_LENGTH, "Body length", SimpleType.LONG);
|
||||
addItem(JMSCompositeDataConstants.BODY_PREVIEW, "Body preview", new ArrayType(SimpleType.BYTE, true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(data);
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body"));
|
||||
long length = 0;
|
||||
length = buffer.readableBytes();
|
||||
rc.put(JMSCompositeDataConstants.BODY_LENGTH, Long.valueOf(length));
|
||||
byte[] preview = new byte[(int) Math.min(length, 255)];
|
||||
buffer.readBytes(preview);
|
||||
rc.put(JMSCompositeDataConstants.BODY_PREVIEW, preview);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||
|
||||
@Override
|
||||
protected String getTypeName() {
|
||||
return "MapMessage";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init() throws OpenDataException {
|
||||
super.init();
|
||||
addItem(JMSCompositeDataConstants.CONTENT_MAP, "Content map", SimpleType.STRING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(data);
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body"));
|
||||
TypedProperties properties = new TypedProperties();
|
||||
MapMessageUtil.readBodyMap(buffer, properties);
|
||||
rc.put(JMSCompositeDataConstants.CONTENT_MAP, "" + properties.getMap());
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
static class ObjectMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||
@Override
|
||||
protected String getTypeName() {
|
||||
return "ObjectMessage";
|
||||
}
|
||||
}
|
||||
static class StreamMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||
@Override
|
||||
protected String getTypeName() {
|
||||
return "StreamMessage";
|
||||
}
|
||||
}
|
||||
|
||||
static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||
|
||||
@Override
|
||||
protected String getTypeName() {
|
||||
return "TextMessage";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init() throws OpenDataException {
|
||||
super.init();
|
||||
addItem(JMSCompositeDataConstants.MESSAGE_TEXT, JMSCompositeDataConstants.MESSAGE_TEXT, SimpleType.STRING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields(CompositeDataSupport data) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(data);
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer((byte[]) data.get("body"));
|
||||
SimpleString value = buffer.readNullableSimpleString();
|
||||
rc.put(JMSCompositeDataConstants.MESSAGE_TEXT, value != null ? value.toString() : "");
|
||||
return rc;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static {
|
||||
OPEN_TYPE_FACTORIES.put(Message.DEFAULT_TYPE, new MessageOpenTypeFactory());
|
||||
OPEN_TYPE_FACTORIES.put(Message.TEXT_TYPE, new TextMessageOpenTypeFactory());
|
||||
OPEN_TYPE_FACTORIES.put(Message.BYTES_TYPE, new ByteMessageOpenTypeFactory());
|
||||
OPEN_TYPE_FACTORIES.put(Message.MAP_TYPE, new MapMessageOpenTypeFactory());
|
||||
OPEN_TYPE_FACTORIES.put(Message.OBJECT_TYPE, new ObjectMessageOpenTypeFactory());
|
||||
OPEN_TYPE_FACTORIES.put(Message.STREAM_TYPE, new StreamMessageOpenTypeFactory());
|
||||
}
|
||||
|
||||
private JMSOpenTypeSupport() {
|
||||
}
|
||||
|
||||
public static OpenTypeFactory getFactory(Byte type) throws OpenDataException {
|
||||
return OPEN_TYPE_FACTORIES.get(type);
|
||||
}
|
||||
|
||||
public static CompositeData convert(CompositeDataSupport data) throws OpenDataException {
|
||||
OpenTypeFactory f = getFactory((Byte) data.get("type"));
|
||||
if (f == null) {
|
||||
throw new OpenDataException("Cannot create a CompositeData for type: " + data.get("type"));
|
||||
}
|
||||
CompositeType ct = f.getCompositeType();
|
||||
Map<String, Object> fields = f.getFields(data);
|
||||
return new CompositeDataSupport(ct, fields);
|
||||
}
|
||||
|
||||
}
|
|
@ -161,6 +161,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String AUTO_DELETE_JMS_QUEUES = "auto-delete-jms-queues";
|
||||
|
||||
private static final String MANAGEMENT_BROWSE_PAGE_SIZE = "management-browse-page-size";
|
||||
|
||||
private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections";
|
||||
|
||||
private static final String MAX_QUEUES_NODE_NAME = "max-queues";
|
||||
|
@ -778,6 +780,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
else if (AUTO_DELETE_JMS_QUEUES.equalsIgnoreCase(name)) {
|
||||
addressSettings.setAutoDeleteJmsQueues(XMLUtil.parseBoolean(child));
|
||||
}
|
||||
else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) {
|
||||
addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child));
|
||||
}
|
||||
}
|
||||
return setting;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.core.management.impl;
|
||||
|
||||
import javax.management.MBeanOperationInfo;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
|
@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
|
|||
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
|
||||
import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
|
||||
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
|
||||
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
|
@ -855,6 +857,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompositeData[] browse(String filterStr) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
int pageSize = addressSettingsRepository.getMatch(queue.getName().toString()).getManagementBrowsePageSize();
|
||||
int currentPageSize = 0;
|
||||
ArrayList<CompositeData> c = new ArrayList<>();
|
||||
Filter filter = FilterImpl.createFilter(filterStr);
|
||||
queue.flushExecutor();
|
||||
LinkedListIterator<MessageReference> iterator = queue.totalIterator();
|
||||
try {
|
||||
while (iterator.hasNext() && currentPageSize++ < pageSize) {
|
||||
MessageReference ref = iterator.next();
|
||||
if (filter == null || filter.match(ref.getMessage())) {
|
||||
c.add(OpenTypeSupport.convert(ref));
|
||||
|
||||
}
|
||||
}
|
||||
CompositeData[] rc = new CompositeData[c.size()];
|
||||
c.toArray(rc);
|
||||
return rc;
|
||||
}
|
||||
finally {
|
||||
iterator.close();
|
||||
}
|
||||
}
|
||||
catch (ActiveMQException e) {
|
||||
throw new IllegalStateException(e.getMessage());
|
||||
}
|
||||
finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushExecutor() {
|
||||
checkStarted();
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.management.impl.openmbean;
|
||||
|
||||
public interface CompositeDataConstants {
|
||||
|
||||
String ADDRESS = "address";
|
||||
String MESSAGE_ID = "messageID";
|
||||
String USER_ID = "userID";
|
||||
String TYPE = "type";
|
||||
String DURABLE = "durable";
|
||||
String EXPIRATION = "expiration";
|
||||
String PRIORITY = "priority";
|
||||
String REDELIVERED = "redelivered";
|
||||
String TIMESTAMP = "timestamp";
|
||||
String BODY = "body";
|
||||
String PROPERTIES = "PropertiesText";
|
||||
|
||||
String ADDRESS_DESCRIPTION = "The Address";
|
||||
String MESSAGE_ID_DESCRIPTION = " The message ID";
|
||||
String USER_ID_DESCRIPTION = "The user ID";
|
||||
String TYPE_DESCRIPTION = "The message type";
|
||||
String DURABLE_DESCRIPTION = "Is the message durable";
|
||||
String EXPIRATION_DESCRIPTION = "The message expiration";
|
||||
String PRIORITY_DESCRIPTION = "The message priority";
|
||||
String REDELIVERED_DESCRIPTION = "Has the message been redelivered";
|
||||
String TIMESTAMP_DESCRIPTION = "The message timestamp";
|
||||
String BODY_DESCRIPTION = "The message body";
|
||||
String PROPERTIES_DESCRIPTION = "The properties text";
|
||||
|
||||
|
||||
// User properties
|
||||
String STRING_PROPERTIES = "StringProperties";
|
||||
String BOOLEAN_PROPERTIES = "BooleanProperties";
|
||||
String BYTE_PROPERTIES = "ByteProperties";
|
||||
String SHORT_PROPERTIES = "ShortProperties";
|
||||
String INT_PROPERTIES = "IntProperties";
|
||||
String LONG_PROPERTIES = "LongProperties";
|
||||
String FLOAT_PROPERTIES = "FloatProperties";
|
||||
String DOUBLE_PROPERTIES = "DoubleProperties";
|
||||
|
||||
String STRING_PROPERTIES_DESCRIPTION = "User String Properties";
|
||||
String BOOLEAN_PROPERTIES_DESCRIPTION = "User Boolean Properties";
|
||||
String BYTE_PROPERTIES_DESCRIPTION = "User Byte Properties";
|
||||
String SHORT_PROPERTIES_DESCRIPTION = "User Short Properties";
|
||||
String INT_PROPERTIES_DESCRIPTION = "User Int Properties";
|
||||
String LONG_PROPERTIES_DESCRIPTION = "User Long Properties";
|
||||
String FLOAT_PROPERTIES_DESCRIPTION = "User Float Properties";
|
||||
String DOUBLE_PROPERTIES_DESCRIPTION = "User Double Properties";
|
||||
|
||||
}
|
|
@ -0,0 +1,256 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.management.impl.openmbean;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
|
||||
import javax.management.openmbean.ArrayType;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import javax.management.openmbean.CompositeType;
|
||||
import javax.management.openmbean.OpenDataException;
|
||||
import javax.management.openmbean.OpenType;
|
||||
import javax.management.openmbean.SimpleType;
|
||||
import javax.management.openmbean.TabularDataSupport;
|
||||
import javax.management.openmbean.TabularType;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public final class OpenTypeSupport {
|
||||
|
||||
private static MessageOpenTypeFactory FACTORY = new MessageOpenTypeFactory();
|
||||
|
||||
private OpenTypeSupport() {
|
||||
}
|
||||
|
||||
public static CompositeData convert(MessageReference ref) throws OpenDataException {
|
||||
CompositeType ct = FACTORY.getCompositeType();
|
||||
Map<String, Object> fields = FACTORY.getFields(ref);
|
||||
return new CompositeDataSupport(ct, fields);
|
||||
}
|
||||
|
||||
|
||||
static class MessageOpenTypeFactory {
|
||||
private CompositeType compositeType;
|
||||
private final List<String> itemNamesList = new ArrayList<String>();
|
||||
private final List<String> itemDescriptionsList = new ArrayList<String>();
|
||||
private final List<OpenType> itemTypesList = new ArrayList<OpenType>();
|
||||
|
||||
protected TabularType stringPropertyTabularType;
|
||||
protected TabularType booleanPropertyTabularType;
|
||||
protected TabularType bytePropertyTabularType;
|
||||
protected TabularType shortPropertyTabularType;
|
||||
protected TabularType intPropertyTabularType;
|
||||
protected TabularType longPropertyTabularType;
|
||||
protected TabularType floatPropertyTabularType;
|
||||
protected TabularType doublePropertyTabularType;
|
||||
|
||||
protected ArrayType body;
|
||||
|
||||
protected String getTypeName() {
|
||||
return Message.class.getName();
|
||||
}
|
||||
|
||||
public CompositeType getCompositeType() throws OpenDataException {
|
||||
if (compositeType == null) {
|
||||
init();
|
||||
compositeType = createCompositeType();
|
||||
}
|
||||
return compositeType;
|
||||
}
|
||||
|
||||
protected void init() throws OpenDataException {
|
||||
|
||||
addItem(CompositeDataConstants.ADDRESS, CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.MESSAGE_ID, CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.USER_ID, CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
|
||||
addItem(CompositeDataConstants.DURABLE, CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN);
|
||||
addItem(CompositeDataConstants.EXPIRATION, CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG);
|
||||
addItem(CompositeDataConstants.PRIORITY, CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE);
|
||||
addItem(CompositeDataConstants.REDELIVERED, CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN);
|
||||
addItem(CompositeDataConstants.TIMESTAMP, CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG);
|
||||
|
||||
addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING);
|
||||
|
||||
// now lets expose the type safe properties
|
||||
stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING);
|
||||
booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN);
|
||||
bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE);
|
||||
shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT);
|
||||
intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER);
|
||||
longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG);
|
||||
floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT);
|
||||
doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE);
|
||||
|
||||
body = new ArrayType(SimpleType.BYTE, true);
|
||||
|
||||
addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body);
|
||||
|
||||
addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType);
|
||||
addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType);
|
||||
addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
|
||||
addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType);
|
||||
addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType);
|
||||
addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
|
||||
addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
|
||||
addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType);
|
||||
}
|
||||
|
||||
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
|
||||
Map<String, Object> rc = new HashMap<>();
|
||||
Message m = ref.getMessage();
|
||||
rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
|
||||
if (m.getUserID() != null) {
|
||||
rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString());
|
||||
}
|
||||
else {
|
||||
rc.put(CompositeDataConstants.USER_ID, "");
|
||||
}
|
||||
rc.put(CompositeDataConstants.ADDRESS, m.getAddress().toString());
|
||||
rc.put(CompositeDataConstants.TYPE, m.getType());
|
||||
rc.put(CompositeDataConstants.DURABLE, m.isDurable());
|
||||
rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());
|
||||
rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp());
|
||||
rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
|
||||
rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
|
||||
|
||||
ActiveMQBuffer bodyCopy = m.getBodyBufferCopy();
|
||||
byte[] bytes = new byte[bodyCopy.readableBytes()];
|
||||
bodyCopy.readBytes(bytes);
|
||||
rc.put(CompositeDataConstants.BODY, bytes);
|
||||
|
||||
Map<String, Object> propertyMap = m.toPropertyMap();
|
||||
|
||||
rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap);
|
||||
|
||||
try {
|
||||
rc.put(CompositeDataConstants.STRING_PROPERTIES, createTabularData(propertyMap, stringPropertyTabularType, String.class));
|
||||
}
|
||||
catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.STRING_PROPERTIES, new TabularDataSupport(stringPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, createTabularData(propertyMap, booleanPropertyTabularType, Boolean.class));
|
||||
}
|
||||
catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, new TabularDataSupport(booleanPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.BYTE_PROPERTIES, createTabularData(propertyMap, bytePropertyTabularType, Byte.class));
|
||||
}
|
||||
catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.BYTE_PROPERTIES, new TabularDataSupport(bytePropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.SHORT_PROPERTIES, createTabularData(propertyMap, shortPropertyTabularType, Short.class));
|
||||
}
|
||||
catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.SHORT_PROPERTIES, new TabularDataSupport(shortPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.INT_PROPERTIES, createTabularData(propertyMap, intPropertyTabularType, Integer.class));
|
||||
}
|
||||
catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.INT_PROPERTIES, new TabularDataSupport(intPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.LONG_PROPERTIES, createTabularData(propertyMap, longPropertyTabularType, Long.class));
|
||||
}
|
||||
catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.LONG_PROPERTIES, new TabularDataSupport(longPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.FLOAT_PROPERTIES, createTabularData(propertyMap, floatPropertyTabularType, Float.class));
|
||||
}
|
||||
catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.FLOAT_PROPERTIES, new TabularDataSupport(floatPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, createTabularData(propertyMap, doublePropertyTabularType, Double.class));
|
||||
}
|
||||
catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, new TabularDataSupport(doublePropertyTabularType));
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
protected String toString(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
return value.toString();
|
||||
}
|
||||
|
||||
protected CompositeType createCompositeType() throws OpenDataException {
|
||||
String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]);
|
||||
String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
|
||||
OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]);
|
||||
return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes);
|
||||
}
|
||||
|
||||
protected String getDescription() {
|
||||
return getTypeName();
|
||||
}
|
||||
|
||||
protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException {
|
||||
String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">";
|
||||
String[] keyValue = new String[]{"key", "value"};
|
||||
OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
|
||||
CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes);
|
||||
return new TabularType(typeName, typeName, rowType, new String[]{"key"});
|
||||
}
|
||||
|
||||
protected TabularDataSupport createTabularData(Map<String, Object> entries, TabularType type, Class valueType) throws IOException, OpenDataException {
|
||||
TabularDataSupport answer = new TabularDataSupport(type);
|
||||
|
||||
for (String key : entries.keySet()) {
|
||||
Object value = entries.get(key);
|
||||
if (valueType.isInstance(value)) {
|
||||
CompositeDataSupport compositeData = createTabularRowValue(type, key, value);
|
||||
answer.put(compositeData);
|
||||
}
|
||||
else if (valueType == String.class && value instanceof SimpleString) {
|
||||
CompositeDataSupport compositeData = createTabularRowValue(type, key, value.toString());
|
||||
answer.put(compositeData);
|
||||
}
|
||||
}
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected CompositeDataSupport createTabularRowValue(TabularType type, String key, Object value) throws OpenDataException {
|
||||
Map<String, Object> fields = new HashMap<String, Object>();
|
||||
fields.put("key", key);
|
||||
fields.put("value", value);
|
||||
return new CompositeDataSupport(type.getRowType(), fields);
|
||||
}
|
||||
|
||||
|
||||
protected void addItem(String name, String description, OpenType type) {
|
||||
itemNamesList.add(name);
|
||||
itemDescriptionsList.add(description);
|
||||
itemTypesList.add(type);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -66,6 +66,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
public static final long DEFAULT_SLOW_CONSUMER_CHECK_PERIOD = 5;
|
||||
|
||||
public static final int MANAGEMENT_BROWSE_PAGE_SIZE = 200;
|
||||
|
||||
public static final SlowConsumerPolicy DEFAULT_SLOW_CONSUMER_POLICY = SlowConsumerPolicy.NOTIFY;
|
||||
|
||||
private AddressFullMessagePolicy addressFullMessagePolicy = null;
|
||||
|
@ -110,6 +112,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private Boolean autoDeleteJmsQueues = null;
|
||||
|
||||
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
|
||||
|
||||
public AddressSettings(AddressSettings other) {
|
||||
this.addressFullMessagePolicy = other.addressFullMessagePolicy;
|
||||
this.maxSizeBytes = other.maxSizeBytes;
|
||||
|
@ -132,6 +136,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
this.slowConsumerPolicy = other.slowConsumerPolicy;
|
||||
this.autoCreateJmsQueues = other.autoCreateJmsQueues;
|
||||
this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
|
||||
this.managementBrowsePageSize = other.managementBrowsePageSize;
|
||||
}
|
||||
|
||||
public AddressSettings() {
|
||||
|
@ -319,6 +324,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public int getManagementBrowsePageSize() {
|
||||
return managementBrowsePageSize != null ? managementBrowsePageSize : AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
|
||||
}
|
||||
|
||||
public AddressSettings setManagementBrowsePageSize(int managementBrowsePageSize) {
|
||||
this.managementBrowsePageSize = managementBrowsePageSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* merge 2 objects in to 1
|
||||
*
|
||||
|
@ -386,6 +400,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (autoDeleteJmsQueues == null) {
|
||||
autoDeleteJmsQueues = merged.autoDeleteJmsQueues;
|
||||
}
|
||||
if (managementBrowsePageSize == null) {
|
||||
managementBrowsePageSize = merged.managementBrowsePageSize;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -445,6 +462,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
autoCreateJmsQueues = BufferHelper.readNullableBoolean(buffer);
|
||||
|
||||
autoDeleteJmsQueues = BufferHelper.readNullableBoolean(buffer);
|
||||
|
||||
managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -470,7 +489,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.sizeOfNullableLong(slowConsumerThreshold) +
|
||||
BufferHelper.sizeOfNullableSimpleString(slowConsumerPolicy != null ? slowConsumerPolicy.toString() : null) +
|
||||
BufferHelper.sizeOfNullableBoolean(autoCreateJmsQueues) +
|
||||
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues);
|
||||
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) +
|
||||
BufferHelper.sizeOfNullableInteger(managementBrowsePageSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -516,6 +536,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.writeNullableBoolean(buffer, autoCreateJmsQueues);
|
||||
|
||||
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsQueues);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -546,6 +568,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = prime * result + ((slowConsumerPolicy == null) ? 0 : slowConsumerPolicy.hashCode());
|
||||
result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
|
||||
result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
|
||||
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -687,6 +710,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
}
|
||||
else if (!autoDeleteJmsQueues.equals(other.autoDeleteJmsQueues))
|
||||
return false;
|
||||
else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
|
||||
return false;
|
||||
if (managementBrowsePageSize == null) {
|
||||
if (other.managementBrowsePageSize != null)
|
||||
return false;
|
||||
}
|
||||
else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -736,6 +767,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
autoCreateJmsQueues +
|
||||
", autoDeleteJmsQueues=" +
|
||||
autoDeleteJmsQueues +
|
||||
", managementBrowsePageSize=" +
|
||||
managementBrowsePageSize +
|
||||
"]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2189,6 +2189,16 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1"
|
||||
minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
how many message a management resource can browse
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
</xsd:all>
|
||||
|
||||
<xsd:attribute name="match" type="xsd:string" use="required">
|
||||
|
|
|
@ -834,6 +834,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> toPropertyMap() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FakeMessage writeBodyBufferBytes(byte[] bytes) {
|
||||
return this;
|
||||
|
|
|
@ -691,6 +691,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> toPropertyMap() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FakeMessageWithID writeBodyBufferBytes(byte[] bytes) {
|
||||
return this;
|
||||
|
|
|
@ -16,15 +16,19 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.jms.server.management;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.management.Notification;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.naming.Context;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -56,6 +60,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
|||
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
||||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||
import org.apache.activemq.artemis.jms.server.management.JMSNotificationType;
|
||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
|
||||
import org.apache.activemq.artemis.tests.integration.management.ManagementTestBase;
|
||||
import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
|
||||
|
@ -150,6 +155,211 @@ public class JMSQueueControlTest extends ManagementTestBase {
|
|||
Assert.assertEquals(0, data.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBrowseMessagesWithNullFilter() throws Exception {
|
||||
JMSQueueControl queueControl = createManagementControl();
|
||||
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
|
||||
String[] ids = JMSUtil.sendMessages(queue, 2);
|
||||
|
||||
Assert.assertEquals(2, getMessageCount(queueControl));
|
||||
|
||||
CompositeData[] data = queueControl.browse();
|
||||
Assert.assertEquals(2, data.length);
|
||||
System.out.println(data[0]);
|
||||
Assert.assertEquals(ids[0], data[0].get("JMSMessageID").toString());
|
||||
Assert.assertEquals(ids[1], data[1].get("JMSMessageID").toString());
|
||||
|
||||
JMSUtil.consumeMessages(2, queue);
|
||||
|
||||
data = queueControl.browse();
|
||||
Assert.assertEquals(0, data.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBrowseMessagesWithNullFilterReplyTo() throws Exception {
|
||||
JMSQueueControl queueControl = createManagementControl();
|
||||
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Message m = JMSUtil.sendMessageWithReplyTo(session, queue, "foo");
|
||||
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
|
||||
CompositeData[] data = queueControl.browse();
|
||||
Assert.assertEquals(1, data.length);
|
||||
Assert.assertEquals(1, data.length);
|
||||
Assert.assertNotNull(data[0].get("JMSReplyTo"));
|
||||
Assert.assertEquals("jms.queue.foo", data[0].get("JMSReplyTo"));
|
||||
System.out.println(data[0]);
|
||||
|
||||
JMSUtil.consumeMessages(1, queue);
|
||||
|
||||
data = queueControl.browse();
|
||||
Assert.assertEquals(0, data.length);
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBrowseMessagesWithAllProps() throws Exception {
|
||||
JMSQueueControl queueControl = createManagementControl();
|
||||
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
JMSUtil.sendMessageWithProperty(session, queue, MessageUtil.CORRELATIONID_HEADER_NAME.toString(), "foo");
|
||||
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
|
||||
CompositeData[] data = queueControl.browse();
|
||||
Assert.assertEquals(1, data.length);
|
||||
Assert.assertNotNull(data[0].get("JMSCorrelationID"));
|
||||
Assert.assertEquals("foo", data[0].get("JMSCorrelationID"));
|
||||
System.out.println(data[0]);
|
||||
|
||||
JMSUtil.consumeMessages(1, queue);
|
||||
|
||||
JMSUtil.sendMessageWithProperty(session, queue, MessageUtil.JMSXGROUPID.toString(), "myGroupID");
|
||||
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
|
||||
data = queueControl.browse();
|
||||
Assert.assertEquals(1, data.length);
|
||||
Assert.assertNotNull(data[0].get("JMSXGroupID"));
|
||||
Assert.assertEquals("myGroupID", data[0].get("JMSXGroupID"));
|
||||
System.out.println(data[0]);
|
||||
|
||||
JMSUtil.consumeMessages(1, queue);
|
||||
|
||||
JMSUtil.sendMessageWithProperty(session, queue, "JMSXGroupSeq", 33);
|
||||
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
|
||||
data = queueControl.browse();
|
||||
Assert.assertEquals(1, data.length);
|
||||
Assert.assertNotNull(data[0].get("JMSXGroupID"));
|
||||
Assert.assertEquals(33, data[0].get("JMSXGroupSeq"));
|
||||
System.out.println(data[0]);
|
||||
|
||||
JMSUtil.consumeMessages(1, queue);
|
||||
|
||||
JMSUtil.sendMessageWithProperty(session, queue, "JMSXUserID", "theheadhonch");
|
||||
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
|
||||
data = queueControl.browse();
|
||||
Assert.assertEquals(1, data.length);
|
||||
Assert.assertNotNull(data[0].get("JMSXUserID"));
|
||||
Assert.assertEquals("theheadhonch", data[0].get("JMSXUserID"));
|
||||
System.out.println(data[0]);
|
||||
|
||||
JMSUtil.consumeMessages(1, queue);
|
||||
|
||||
data = queueControl.browse();
|
||||
Assert.assertEquals(0, data.length);
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBrowseBytesMessages() throws Exception {
|
||||
JMSQueueControl queueControl = createManagementControl();
|
||||
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
|
||||
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName()));
|
||||
|
||||
Connection conn = cf.createConnection();
|
||||
conn.start();
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
byte[] bytes = RandomUtil.randomBytes(201);
|
||||
|
||||
BytesMessage message = JMSUtil.sendByteMessage(session, queue, bytes);
|
||||
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
|
||||
CompositeData[] data = queueControl.browse();
|
||||
Assert.assertEquals(1, data.length);
|
||||
Assert.assertNotNull(data[0].get("BodyLength"));
|
||||
Assert.assertEquals(201L, data[0].get("BodyLength"));
|
||||
Assert.assertNotNull(data[0].get("BodyPreview"));
|
||||
Assert.assertArrayEquals(message.getBody(byte[].class), (byte[]) data[0].get("BodyPreview"));
|
||||
System.out.println(data[0]);
|
||||
|
||||
JMSUtil.consumeMessages(1, queue);
|
||||
|
||||
bytes = RandomUtil.randomBytes(301);
|
||||
|
||||
message = JMSUtil.sendByteMessage(session, queue, bytes);
|
||||
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
|
||||
data = queueControl.browse();
|
||||
Assert.assertEquals(1, data.length);
|
||||
Assert.assertNotNull(data[0].get("BodyLength"));
|
||||
Assert.assertEquals(301L, data[0].get("BodyLength"));
|
||||
Assert.assertNotNull(data[0].get("BodyPreview"));
|
||||
byte[] body = message.getBody(byte[].class);
|
||||
Assert.assertArrayEquals(Arrays.copyOf(body, 255), (byte[]) data[0].get("BodyPreview"));
|
||||
System.out.println(data[0]);
|
||||
JMSUtil.consumeMessages(1, queue);
|
||||
|
||||
data = queueControl.browse();
|
||||
Assert.assertEquals(0, data.length);
|
||||
conn.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBrowseMapMessages() throws Exception {
|
||||
JMSQueueControl queueControl = createManagementControl();
|
||||
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
|
||||
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName()));
|
||||
|
||||
Connection conn = cf.createConnection();
|
||||
conn.start();
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
MapMessage message = session.createMapMessage();
|
||||
message.setString("stringP", "aStringP");
|
||||
message.setBoolean("booleanP", true);
|
||||
message.setByte("byteP", (byte) 1);
|
||||
message.setChar("charP", 'q');
|
||||
message.setDouble("doubleP", 3.2);
|
||||
message.setFloat("floatP", 4.5F);
|
||||
message.setInt("intP", 8);
|
||||
message.setLong("longP", 7);
|
||||
message.setShort("shortP", (short) 777);
|
||||
producer.send(message);
|
||||
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
|
||||
CompositeData[] data = queueControl.browse();
|
||||
Assert.assertEquals(1, data.length);
|
||||
String contentMap = (String) data[0].get("ContentMap");
|
||||
Assert.assertNotNull(contentMap);
|
||||
Assert.assertTrue(contentMap.contains("intP=8"));
|
||||
Assert.assertTrue(contentMap.contains("floatP=4.5"));
|
||||
Assert.assertTrue(contentMap.contains("longP=7"));
|
||||
Assert.assertTrue(contentMap.contains("charP=q"));
|
||||
Assert.assertTrue(contentMap.contains("byteP=1"));
|
||||
Assert.assertTrue(contentMap.contains("doubleP=3.2"));
|
||||
Assert.assertTrue(contentMap.contains("stringP=aStringP"));
|
||||
Assert.assertTrue(contentMap.contains("booleanP=true"));
|
||||
Assert.assertTrue(contentMap.contains("shortP=777"));
|
||||
System.out.println(data[0]);
|
||||
|
||||
JMSUtil.consumeMessages(1, queue);
|
||||
|
||||
data = queueControl.browse();
|
||||
Assert.assertEquals(0, data.length);
|
||||
conn.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListDeliveringMessages() throws Exception {
|
||||
JMSQueueControl queueControl = createManagementControl();
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.junit.Test;
|
|||
import javax.jms.QueueConnection;
|
||||
import javax.jms.QueueSession;
|
||||
import javax.jms.Session;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -317,6 +318,21 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest {
|
|||
proxy.invokeOperation("resume");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompositeData[] browse() throws Exception {
|
||||
Map map = (Map) proxy.invokeOperation("browse");
|
||||
CompositeData[] compositeDatas = (CompositeData[]) map.get(CompositeData.class.getName());
|
||||
if (compositeDatas == null) {
|
||||
compositeDatas = new CompositeData[0];
|
||||
}
|
||||
return compositeDatas;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompositeData[] browse(String filter) throws Exception {
|
||||
return new CompositeData[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSelector() {
|
||||
return (String) proxy.retrieveAttributeValue("selector");
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.jms.server.management;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
|
@ -156,6 +157,48 @@ public class JMSUtil {
|
|||
return message;
|
||||
}
|
||||
|
||||
public static BytesMessage sendByteMessage(final Session session,
|
||||
final Destination destination,
|
||||
final byte[] bytes) throws JMSException {
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
BytesMessage message = session.createBytesMessage();
|
||||
message.writeBytes(bytes);
|
||||
producer.send(message);
|
||||
return message;
|
||||
}
|
||||
|
||||
public static Message sendMessageWithProperty(final Session session,
|
||||
final Destination destination,
|
||||
final String key,
|
||||
final int value) throws JMSException {
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
Message message = session.createMessage();
|
||||
message.setIntProperty(key, value);
|
||||
producer.send(message);
|
||||
return message;
|
||||
}
|
||||
|
||||
public static Message sendMessageWithProperty(final Session session,
|
||||
final Destination destination,
|
||||
final String key,
|
||||
final String value) throws JMSException {
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty(key, value);
|
||||
producer.send(message);
|
||||
return message;
|
||||
}
|
||||
|
||||
public static Message sendMessageWithReplyTo(final Session session,
|
||||
final Destination destination,
|
||||
final String replyTo) throws JMSException {
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
Message message = session.createMessage();
|
||||
message.setJMSReplyTo(ActiveMQJMSClient.createQueue(replyTo));
|
||||
producer.send(message);
|
||||
return message;
|
||||
}
|
||||
|
||||
public static void consumeMessages(final int expected, final Destination dest) throws JMSException {
|
||||
Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
|
||||
try {
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
|
|||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.junit.Before;
|
||||
|
||||
import javax.management.openmbean.CompositeData;
|
||||
|
||||
public class QueueControlUsingCoreTest extends QueueControlTest {
|
||||
|
||||
protected ClientSession session;
|
||||
|
@ -328,6 +330,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
|||
return (Boolean) proxy.invokeOperation("isPaused");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompositeData[] browse(String filter) throws Exception {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String listConsumersAsJSON() throws Exception {
|
||||
return (String) proxy.invokeOperation("listConsumersAsJSON");
|
||||
|
|
Loading…
Reference in New Issue