ARTEMIS-3461 Generalize MBean Support on Messages and avoid converstion to core on AMQP Messages on console browsing

Done in collaboration with Erwin Dondorp through https://github.com/apache/activemq-artemis/pull/3794/
This commit is contained in:
Clebert Suconic 2021-10-14 13:25:56 -04:00 committed by clebertsuconic
parent fd12209488
commit a833d95c1f
11 changed files with 713 additions and 371 deletions

View File

@ -325,14 +325,19 @@ public final class JsonUtil {
private JsonUtil() {
}
public static String truncateString(final String str, final int valueSizeLimit) {
if (str.length() > valueSizeLimit) {
return new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString();
} else {
return str;
}
}
public static Object truncate(final Object value, final int valueSizeLimit) {
Object result = value;
if (valueSizeLimit >= 0) {
if (String.class.equals(value.getClass())) {
String str = (String) value;
if (str.length() > valueSizeLimit) {
result = new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString();
}
result = truncateString((String)value, valueSizeLimit);
} else if (value.getClass().isArray()) {
if (byte[].class.equals(value.getClass())) {
if (((byte[]) value).length > valueSizeLimit) {

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.api.core;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
@ -769,6 +771,10 @@ public interface Message {
/** This should make you convert your message into Core format. */
ICoreMessage toCore();
default CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException {
return null;
}
/** This should make you convert your message into Core format. */
ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools);

View File

@ -17,8 +17,15 @@
package org.apache.activemq.artemis.core.message.impl;
import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.SimpleType;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.zip.DataFormatException;
@ -33,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -40,6 +48,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@ -1216,6 +1226,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return this;
}
@Override
public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return this;
@ -1290,4 +1301,88 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return body;
}
// *******************************************************************************************************************************
// Composite Data implementation
private static MessageOpenTypeFactory TEXT_FACTORY = new TextMessageOpenTypeFactory();
private static MessageOpenTypeFactory BYTES_FACTORY = new BytesMessageOpenTypeFactory();
@Override
public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException {
CompositeType ct;
Map<String, Object> fields;
byte type = getType();
switch (type) {
case Message.TEXT_TYPE:
ct = TEXT_FACTORY.getCompositeType();
fields = TEXT_FACTORY.getFields(this, fieldsLimit, deliveryCount);
break;
default:
ct = BYTES_FACTORY.getCompositeType();
fields = BYTES_FACTORY.getFields(this, fieldsLimit, deliveryCount);
break;
}
return new CompositeDataSupport(ct, fields);
}
static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory<CoreMessage> {
protected ArrayType body;
@Override
protected void init() throws OpenDataException {
super.init();
body = new ArrayType(SimpleType.BYTE, true);
addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body);
}
@Override
public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
rc.put(CompositeDataConstants.TYPE, m.getType());
if (!m.isLargeMessage()) {
ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer();
byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1];
bodyCopy.readBytes(bytes);
rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit));
} else {
rc.put(CompositeDataConstants.BODY, new byte[0]);
}
return rc;
}
}
static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory<CoreMessage> {
@Override
protected void init() throws OpenDataException {
super.init();
addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
}
@Override
public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
rc.put(CompositeDataConstants.TYPE, m.getType());
if (!m.isLargeMessage()) {
if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
} else {
SimpleString text = m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text.toString(), valueSizeLimit) : "");
}
} else {
rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
}
return rc;
}
}
// Composite Data implementation
// *******************************************************************************************************************************
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.message.openmbean;
public interface CompositeDataConstants {
String ADDRESS = "address";
String MESSAGE_ID = "messageID";
String USER_ID = "userID";
String TYPE = "type";
String DURABLE = "durable";
String EXPIRATION = "expiration";
String PRIORITY = "priority";
String REDELIVERED = "redelivered";
String TIMESTAMP = "timestamp";
String BODY = "BodyPreview";
String TEXT_BODY = "text";
String LARGE_MESSAGE = "largeMessage";
String PERSISTENT_SIZE = "persistentSize";
String PROPERTIES = "PropertiesText";
String ADDRESS_DESCRIPTION = "The Address";
String MESSAGE_ID_DESCRIPTION = "The message ID";
String USER_ID_DESCRIPTION = "The user ID";
String TYPE_DESCRIPTION = "The message type";
String DURABLE_DESCRIPTION = "Is the message durable";
String EXPIRATION_DESCRIPTION = "The message expiration";
String PRIORITY_DESCRIPTION = "The message priority";
String REDELIVERED_DESCRIPTION = "Has the message been redelivered";
String TIMESTAMP_DESCRIPTION = "The message timestamp";
String BODY_DESCRIPTION = "The message body";
String LARGE_MESSAGE_DESCRIPTION = "Is the message treated as a large message";
String PERSISTENT_SIZE_DESCRIPTION = "The message size when persisted on disk";
String PROPERTIES_DESCRIPTION = "The properties text";
// User properties
String STRING_PROPERTIES = "StringProperties";
String BOOLEAN_PROPERTIES = "BooleanProperties";
String BYTE_PROPERTIES = "ByteProperties";
String SHORT_PROPERTIES = "ShortProperties";
String INT_PROPERTIES = "IntProperties";
String LONG_PROPERTIES = "LongProperties";
String FLOAT_PROPERTIES = "FloatProperties";
String DOUBLE_PROPERTIES = "DoubleProperties";
String STRING_PROPERTIES_DESCRIPTION = "User String Properties";
String BOOLEAN_PROPERTIES_DESCRIPTION = "User Boolean Properties";
String BYTE_PROPERTIES_DESCRIPTION = "User Byte Properties";
String SHORT_PROPERTIES_DESCRIPTION = "User Short Properties";
String INT_PROPERTIES_DESCRIPTION = "User Int Properties";
String LONG_PROPERTIES_DESCRIPTION = "User Long Properties";
String FLOAT_PROPERTIES_DESCRIPTION = "User Float Properties";
String DOUBLE_PROPERTIES_DESCRIPTION = "User Double Properties";
}

View File

@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.message.openmbean;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.jboss.logging.Logger;
public class MessageOpenTypeFactory<M extends Message> {
private static final Logger logger = Logger.getLogger(MessageOpenTypeFactory.class);
public MessageOpenTypeFactory() {
try {
init();
compositeType = createCompositeType();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
private CompositeType compositeType;
private final List<String> itemNamesList = new ArrayList<>();
private final List<String> itemDescriptionsList = new ArrayList<>();
private final List<OpenType> itemTypesList = new ArrayList<>();
protected TabularType stringPropertyTabularType;
protected TabularType booleanPropertyTabularType;
protected TabularType bytePropertyTabularType;
protected TabularType shortPropertyTabularType;
protected TabularType intPropertyTabularType;
protected TabularType longPropertyTabularType;
protected TabularType floatPropertyTabularType;
protected TabularType doublePropertyTabularType;
protected Object[][] typedPropertyFields;
protected String getTypeName() {
return Message.class.getName();
}
public CompositeType getCompositeType() throws OpenDataException {
return compositeType;
}
protected void init() throws OpenDataException {
addItem(CompositeDataConstants.ADDRESS, CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING);
addItem(CompositeDataConstants.MESSAGE_ID, CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
addItem(CompositeDataConstants.USER_ID, CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING);
addItem(CompositeDataConstants.DURABLE, CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN);
addItem(CompositeDataConstants.EXPIRATION, CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG);
addItem(CompositeDataConstants.PRIORITY, CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE);
addItem(CompositeDataConstants.REDELIVERED, CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN);
addItem(CompositeDataConstants.TIMESTAMP, CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG);
addItem(CompositeDataConstants.LARGE_MESSAGE, CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN);
addItem(CompositeDataConstants.PERSISTENT_SIZE, CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG);
addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING);
// now lets expose the type safe properties
stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING);
booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN);
bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE);
shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT);
intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER);
longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG);
floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT);
doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE);
addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType);
addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType);
addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType);
addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType);
addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType);
typedPropertyFields = new Object[][] {
{CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, String.class},
{CompositeDataConstants.BOOLEAN_PROPERTIES, booleanPropertyTabularType, Boolean.class},
{CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, Byte.class},
{CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, Short.class},
{CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, Integer.class},
{CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, Long.class},
{CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, Float.class},
{CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, Double.class}
};
}
public Map<String, Object> getFields(M m, int valueSizeLimit, int deliveryCount) throws OpenDataException {
Map<String, Object> rc = new HashMap<>();
rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
if (m.getUserID() != null) {
rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString());
} else {
rc.put(CompositeDataConstants.USER_ID, "");
}
rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString());
rc.put(CompositeDataConstants.DURABLE, m.isDurable());
rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());
rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp());
rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
rc.put(CompositeDataConstants.REDELIVERED, deliveryCount > 1);
rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage());
try {
rc.put(CompositeDataConstants.PERSISTENT_SIZE, m.getPersistentSize());
} catch (final ActiveMQException e1) {
rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
}
Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit));
// only populate if there are some values
TabularDataSupport tabularData;
for (Object[] typedPropertyInfo : typedPropertyFields) {
tabularData = null;
try {
tabularData = createTabularData(propertyMap, (TabularType) typedPropertyInfo[1], (Class) typedPropertyInfo[2]);
} catch (Exception ignored) {
}
if (tabularData != null && !tabularData.isEmpty()) {
rc.put((String) typedPropertyInfo[0], tabularData);
} else {
rc.put((String) typedPropertyInfo[0], null);
}
}
return rc;
}
protected String toString(Object value) {
if (value == null) {
return null;
}
return value.toString();
}
protected CompositeType createCompositeType() throws OpenDataException {
String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]);
String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]);
return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes);
}
protected String getDescription() {
return getTypeName();
}
protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException {
String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">";
String[] keyValue = new String[]{"key", "value"};
OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes);
return new TabularType(typeName, typeName, rowType, new String[]{"key"});
}
protected TabularDataSupport createTabularData(Map<String, Object> entries,
TabularType type,
Class valueType) throws IOException, OpenDataException {
TabularDataSupport answer = new TabularDataSupport(type);
for (String key : entries.keySet()) {
Object value = entries.get(key);
if (valueType.isInstance(value)) {
CompositeDataSupport compositeData = createTabularRowValue(type, key, value);
answer.put(compositeData);
} else if (valueType == String.class && value instanceof SimpleString) {
CompositeDataSupport compositeData = createTabularRowValue(type, key, value.toString());
answer.put(compositeData);
}
}
return answer;
}
protected CompositeDataSupport createTabularRowValue(TabularType type,
String key,
Object value) throws OpenDataException {
Map<String, Object> fields = new HashMap<>();
fields.put("key", key);
fields.put("value", value);
return new CompositeDataSupport(type.getRowType(), fields);
}
protected void addItem(String name, String description, OpenType type) {
itemNamesList.add(name);
itemDescriptionsList.add(description);
itemTypesList.add(type);
}
}

View File

@ -16,11 +16,17 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.SimpleType;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -34,6 +40,8 @@ import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
@ -72,6 +80,8 @@ import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
/**
* See <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">AMQP v1.0 message format</a>
* <pre>
@ -834,9 +844,64 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
value = JsonUtil.truncate(value, valueSizeLimit);
map.put(name.toString(), value);
}
TypedProperties extraProperties = getExtraProperties();
if (extraProperties != null) {
extraProperties.forEach((s, o) -> {
map.put(s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit));
});
}
if (!isLargeMessage()) {
addAnnotationsAsProperties(map, messageAnnotations);
}
if (properties != null) {
if (properties.getContentType() != null) {
map.put("properties.getContentType()", properties.getContentType().toString());
}
if (properties.getContentEncoding() != null) {
map.put("properties.getContentEncoding()", properties.getContentEncoding().toString());
}
if (properties.getGroupId() != null) {
map.put("properties.getGroupID()", properties.getGroupId());
}
if (properties.getGroupSequence() != null) {
map.put("properties.getGroupSequence()", properties.getGroupSequence().intValue());
}
if (properties.getReplyToGroupId() != null) {
map.put("properties.getReplyToGroupId()", properties.getReplyToGroupId());
}
}
return map;
}
protected static void addAnnotationsAsProperties(Map map, MessageAnnotations annotations) {
if (annotations != null && annotations.getValue() != null) {
for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
String key = entry.getKey().toString();
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
long deliveryTime = ((Number) entry.getValue()).longValue();
map.put("annotation x-opt-delivery-time", deliveryTime);
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
long delay = ((Number) entry.getValue()).longValue();
if (delay > 0) {
map.put("annotation x-opt-delivery-delay", System.currentTimeMillis() + delay);
}
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
map.put("annotation X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue());
} else {
try {
map.put("annotation " + key, entry.getValue());
} catch (ActiveMQPropertyConversionException e) {
}
}
}
}
}
@Override
public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
try {
@ -1726,4 +1791,103 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
public void setOwner(Object object) {
this.owner = object;
}
// *******************************************************************************************************************************
// Composite Data implementation
private static MessageOpenTypeFactory AMQP_FACTORY = new AmqpMessageOpenTypeFactory();
static class AmqpMessageOpenTypeFactory extends MessageOpenTypeFactory<AMQPMessage> {
@Override
protected void init() throws OpenDataException {
super.init();
addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
}
@Override
public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
if (!m.isLargeMessage()) {
m.ensureScanning();
}
Properties properties = m.getCurrentProperties();
byte type = getType(m, properties);
rc.put(CompositeDataConstants.TYPE, type);
if (m.isLargeMessage()) {
rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
} else {
if (m.getBody() instanceof AmqpValue) {
Object amqpValue = ((AmqpValue) m.getBody()).getValue();
rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
} else {
rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
}
}
return rc;
}
private byte getType(AMQPMessage m, Properties properties) {
if (m.isLargeMessage()) {
return DEFAULT_TYPE;
}
byte type = BYTES_TYPE;
final Symbol contentType = properties != null ? properties.getContentType() : null;
final String contentTypeString = contentType != null ? contentType.toString() : null;
if (m.getBody() instanceof Data) {
if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
type = OBJECT_TYPE;
} else if (contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) {
type = BYTES_TYPE;
} else {
Charset charset = getCharsetForTextualContent(contentTypeString);
if (StandardCharsets.UTF_8.equals(charset)) {
type = TEXT_TYPE;
}
}
} else if (m.getBody() instanceof AmqpSequence) {
type = STREAM_TYPE;
} else if (m.getBody() instanceof AmqpValue) {
Object value = ((AmqpValue) m.getBody()).getValue();
if (value instanceof String) {
type = TEXT_TYPE;
} else if (value instanceof Binary) {
if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
type = OBJECT_TYPE;
} else {
type = BYTES_TYPE;
}
} else if (value instanceof List) {
type = STREAM_TYPE;
} else if (value instanceof Map) {
type = MAP_TYPE;
}
}
return type;
}
}
@Override
public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException {
Map<String, Object> fields;
fields = AMQP_FACTORY.getFields(this, fieldsLimit, deliveryCount);
return new CompositeDataSupport(AMQP_FACTORY.getCompositeType(), fields);
}
// Composite Data implementation
// *******************************************************************************************************************************
}

View File

@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -61,9 +60,12 @@ import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger;
public class QueueControlImpl extends AbstractControl implements QueueControl {
private static final Logger logger = Logger.getLogger(QueueControlImpl.class);
public static final int FLUSH_LIMIT = 500;
// Constants -----------------------------------------------------
@ -1583,7 +1585,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
if (index >= start) {
c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
}
//we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
index++;
@ -1600,7 +1602,8 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
return rc;
}
} catch (ActiveMQException e) {
} catch (Exception e) {
logger.warn(e.getMessage(), e);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesFailure(queue.getName().toString());
}
@ -1635,7 +1638,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
while (iterator.hasNext() && currentPageSize++ < limit) {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
}
}

View File

@ -1,12 +1,12 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -14,56 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.openmbean;
public interface CompositeDataConstants {
String ADDRESS = "address";
String MESSAGE_ID = "messageID";
String USER_ID = "userID";
String TYPE = "type";
String DURABLE = "durable";
String EXPIRATION = "expiration";
String PRIORITY = "priority";
String REDELIVERED = "redelivered";
String TIMESTAMP = "timestamp";
String BODY = "BodyPreview";
String TEXT_BODY = "text";
String LARGE_MESSAGE = "largeMessage";
String PERSISTENT_SIZE = "persistentSize";
String PROPERTIES = "PropertiesText";
String ADDRESS_DESCRIPTION = "The Address";
String MESSAGE_ID_DESCRIPTION = "The message ID";
String USER_ID_DESCRIPTION = "The user ID";
String TYPE_DESCRIPTION = "The message type";
String DURABLE_DESCRIPTION = "Is the message durable";
String EXPIRATION_DESCRIPTION = "The message expiration";
String PRIORITY_DESCRIPTION = "The message priority";
String REDELIVERED_DESCRIPTION = "Has the message been redelivered";
String TIMESTAMP_DESCRIPTION = "The message timestamp";
String BODY_DESCRIPTION = "The message body";
String LARGE_MESSAGE_DESCRIPTION = "Is the message treated as a large message";
String PERSISTENT_SIZE_DESCRIPTION = "The message size when persisted on disk";
String PROPERTIES_DESCRIPTION = "The properties text";
// User properties
String STRING_PROPERTIES = "StringProperties";
String BOOLEAN_PROPERTIES = "BooleanProperties";
String BYTE_PROPERTIES = "ByteProperties";
String SHORT_PROPERTIES = "ShortProperties";
String INT_PROPERTIES = "IntProperties";
String LONG_PROPERTIES = "LongProperties";
String FLOAT_PROPERTIES = "FloatProperties";
String DOUBLE_PROPERTIES = "DoubleProperties";
String STRING_PROPERTIES_DESCRIPTION = "User String Properties";
String BOOLEAN_PROPERTIES_DESCRIPTION = "User Boolean Properties";
String BYTE_PROPERTIES_DESCRIPTION = "User Byte Properties";
String SHORT_PROPERTIES_DESCRIPTION = "User Short Properties";
String INT_PROPERTIES_DESCRIPTION = "User Int Properties";
String LONG_PROPERTIES_DESCRIPTION = "User Long Properties";
String FLOAT_PROPERTIES_DESCRIPTION = "User Float Properties";
String DOUBLE_PROPERTIES_DESCRIPTION = "User Double Properties";
/**
* @deprecated use org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants
*/
@Deprecated
public interface CompositeDataConstants extends org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants {
}

View File

@ -1,305 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl.openmbean;
import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
public final class OpenTypeSupport {
private static MessageOpenTypeFactory TEXT_FACTORY = new TextMessageOpenTypeFactory();
private static MessageOpenTypeFactory BYTES_FACTORY = new BytesMessageOpenTypeFactory();
private OpenTypeSupport() {
}
public static CompositeData convert(MessageReference ref, int valueSizeLimit) throws OpenDataException {
CompositeType ct;
ICoreMessage message = ref.getMessage().toCore();
Map<String, Object> fields;
byte type = message.getType();
switch(type) {
case Message.TEXT_TYPE:
ct = TEXT_FACTORY.getCompositeType();
fields = TEXT_FACTORY.getFields(ref, valueSizeLimit);
break;
default:
ct = BYTES_FACTORY.getCompositeType();
fields = BYTES_FACTORY.getFields(ref, valueSizeLimit);
break;
}
return new CompositeDataSupport(ct, fields);
}
static class MessageOpenTypeFactory {
private CompositeType compositeType;
private final List<String> itemNamesList = new ArrayList<>();
private final List<String> itemDescriptionsList = new ArrayList<>();
private final List<OpenType> itemTypesList = new ArrayList<>();
protected TabularType stringPropertyTabularType;
protected TabularType booleanPropertyTabularType;
protected TabularType bytePropertyTabularType;
protected TabularType shortPropertyTabularType;
protected TabularType intPropertyTabularType;
protected TabularType longPropertyTabularType;
protected TabularType floatPropertyTabularType;
protected TabularType doublePropertyTabularType;
protected Object[][] typedPropertyFields;
protected String getTypeName() {
return Message.class.getName();
}
public CompositeType getCompositeType() throws OpenDataException {
if (compositeType == null) {
init();
compositeType = createCompositeType();
}
return compositeType;
}
protected void init() throws OpenDataException {
addItem(CompositeDataConstants.ADDRESS, CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING);
addItem(CompositeDataConstants.MESSAGE_ID, CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
addItem(CompositeDataConstants.USER_ID, CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING);
addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
addItem(CompositeDataConstants.DURABLE, CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN);
addItem(CompositeDataConstants.EXPIRATION, CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG);
addItem(CompositeDataConstants.PRIORITY, CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE);
addItem(CompositeDataConstants.REDELIVERED, CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN);
addItem(CompositeDataConstants.TIMESTAMP, CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG);
addItem(CompositeDataConstants.LARGE_MESSAGE, CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN);
addItem(CompositeDataConstants.PERSISTENT_SIZE, CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG);
addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING);
// now lets expose the type safe properties
stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING);
booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN);
bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE);
shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT);
intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER);
longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG);
floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT);
doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE);
addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType);
addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType);
addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType);
addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType);
addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType);
typedPropertyFields = new Object[][] {
{CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, String.class},
{CompositeDataConstants.BOOLEAN_PROPERTIES, booleanPropertyTabularType, Boolean.class},
{CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, Byte.class},
{CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, Short.class},
{CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, Integer.class},
{CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, Long.class},
{CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, Float.class},
{CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, Double.class}
};
}
public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException {
Map<String, Object> rc = new HashMap<>();
ICoreMessage m = ref.getMessage().toCore();
rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
if (m.getUserID() != null) {
rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString());
} else {
rc.put(CompositeDataConstants.USER_ID, "");
}
rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString());
rc.put(CompositeDataConstants.TYPE, m.getType());
rc.put(CompositeDataConstants.DURABLE, m.isDurable());
rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());
rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp());
rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage());
try {
rc.put(CompositeDataConstants.PERSISTENT_SIZE, m.getPersistentSize());
} catch (final ActiveMQException e1) {
rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
}
Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit));
// only populate if there are some values
TabularDataSupport tabularData;
for (Object[] typedPropertyInfo : typedPropertyFields) {
tabularData = null;
try {
tabularData = createTabularData(propertyMap, (TabularType) typedPropertyInfo[1], (Class) typedPropertyInfo[2]);
} catch (Exception ignored) {
}
if (tabularData != null && !tabularData.isEmpty()) {
rc.put((String) typedPropertyInfo[0], tabularData);
} else {
rc.put((String) typedPropertyInfo[0], null);
}
}
return rc;
}
protected String toString(Object value) {
if (value == null) {
return null;
}
return value.toString();
}
protected CompositeType createCompositeType() throws OpenDataException {
String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]);
String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]);
return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes);
}
protected String getDescription() {
return getTypeName();
}
protected <T> TabularType createTabularType(Class<T> type, OpenType openType) throws OpenDataException {
String typeName = "java.util.Map<java.lang.String, " + type.getName() + ">";
String[] keyValue = new String[]{"key", "value"};
OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes);
return new TabularType(typeName, typeName, rowType, new String[]{"key"});
}
protected TabularDataSupport createTabularData(Map<String, Object> entries,
TabularType type,
Class valueType) throws IOException, OpenDataException {
TabularDataSupport answer = new TabularDataSupport(type);
for (String key : entries.keySet()) {
Object value = entries.get(key);
if (valueType.isInstance(value)) {
CompositeDataSupport compositeData = createTabularRowValue(type, key, value);
answer.put(compositeData);
} else if (valueType == String.class && value instanceof SimpleString) {
CompositeDataSupport compositeData = createTabularRowValue(type, key, value.toString());
answer.put(compositeData);
}
}
return answer;
}
protected CompositeDataSupport createTabularRowValue(TabularType type,
String key,
Object value) throws OpenDataException {
Map<String, Object> fields = new HashMap<>();
fields.put("key", key);
fields.put("value", value);
return new CompositeDataSupport(type.getRowType(), fields);
}
protected void addItem(String name, String description, OpenType type) {
itemNamesList.add(name);
itemDescriptionsList.add(description);
itemTypesList.add(type);
}
}
static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory {
protected ArrayType body;
@Override
protected void init() throws OpenDataException {
super.init();
body = new ArrayType(SimpleType.BYTE, true);
addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body);
}
@Override
public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException {
Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
ICoreMessage m = ref.getMessage().toCore();
if (!m.isLargeMessage()) {
ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1];
bodyCopy.readBytes(bytes);
rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit));
} else {
rc.put(CompositeDataConstants.BODY, new byte[0]);
}
return rc;
}
}
static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
protected SimpleType text;
@Override
protected void init() throws OpenDataException {
super.init();
addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
}
@Override
public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException {
Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
ICoreMessage m = ref.getMessage().toCore();
if (!m.isLargeMessage()) {
if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
} else {
SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text.toString(), valueSizeLimit) : "");
}
} else {
rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
}
return rc;
}
}
}

View File

@ -20,7 +20,6 @@ package org.apache.activemq.artemis.core.management.impl.openmbean;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.reader.TextMessageUtil;
import org.junit.Assert;
import org.junit.Test;
@ -39,7 +38,7 @@ public class OpenTypeSupportTest {
TextMessageUtil.writeBodyText(coreMessage.getBodyBuffer(), SimpleString.toSimpleString(bodyText));
CompositeData cd = OpenTypeSupport.convert(new MessageReferenceImpl(coreMessage, null), 256);
CompositeData cd = coreMessage.toCompositeData(256, 1);
Assert.assertEquals(bodyText, cd.get("text"));
}

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.Notification;
@ -70,10 +76,12 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
@ -86,8 +94,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.STRING_PROPERTIES;
import static org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.BODY;
import static org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.STRING_PROPERTIES;
@RunWith(value = Parameterized.class)
public class QueueControlTest extends ManagementTestBase {
@ -3447,6 +3455,123 @@ public class QueueControlTest extends ManagementTestBase {
Assert.assertEquals(new String(body), "theBody");
}
@Test
public void testSendMessageWithAMQP() throws Exception {
SimpleString address = new SimpleString("address_testSendMessageWithAMQP");
SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP");
server.addAddressInfo(new AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server.locateQueue(queue) != null && server.getAddressInfo(address) != null);
QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);
{ // a namespace
ConnectionFactory factory = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
try (Connection connection = factory.createConnection("myUser", "myPassword")) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(address.toString()));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage("theAMQPBody");
message.setStringProperty("protocolUsed", "amqp");
producer.send(message);
}
}
{ // a namespace
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection("myUser", "myPassword")) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(address.toString()));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage("theCoreBody");
message.setStringProperty("protocolUsed", "core");
producer.send(message);
}
}
Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100);
// the message IDs are set on the server
CompositeData[] browse = queueControl.browse(null);
Assert.assertEquals(2, browse.length);
String body = (String) browse[0].get("text");
Assert.assertNotNull(body);
Assert.assertEquals("theAMQPBody", body);
body = (String) browse[1].get("text");
Assert.assertNotNull(body);
Assert.assertEquals("theCoreBody", body);
}
@Test
public void testSendMessageWithAMQPLarge() throws Exception {
SimpleString address = new SimpleString("address_testSendMessageWithAMQP");
SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP");
server.addAddressInfo(new AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server.locateQueue(queue) != null && server.getAddressInfo(address) != null);
QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);
StringBuffer bufferLarge = new StringBuffer();
for (int i = 0; i < 100 * 1024; i++) {
bufferLarge.append("*-");
}
{ // a namespace
ConnectionFactory factory = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
try (Connection connection = factory.createConnection("myUser", "myPassword")) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(address.toString()));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(bufferLarge.toString());
message.setStringProperty("protocolUsed", "amqp");
producer.send(message);
}
}
{ // a namespace
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = factory.createConnection("myUser", "myPassword")) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(address.toString()));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(bufferLarge.toString());
message.setStringProperty("protocolUsed", "core");
producer.send(message);
}
}
Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100);
// the message IDs are set on the server
CompositeData[] browse = queueControl.browse(null);
Assert.assertEquals(2, browse.length);
String body = (String) browse[0].get("text");
Assert.assertNotNull(body);
body = (String) browse[1].get("text");
Assert.assertNotNull(body);
}
@Test
public void testSendMessageWithMessageId() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
@ -3763,7 +3888,7 @@ public class QueueControlTest extends ManagementTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
Configuration conf = createDefaultInVMConfig().setJMXManagementEnabled(true);
Configuration conf = createDefaultConfig(true).setJMXManagementEnabled(true);
server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, true));
server.start();