ARTEMIS-3529 Fixing integration tests after Web Console Parsing of Large Messages
This commit is contained in:
parent
54418dfcaf
commit
1db3ae1dc0
|
@ -141,7 +141,7 @@ public class MessageOpenTypeFactory<M extends Message> {
|
||||||
rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
|
rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
|
Map<String, Object> propertyMap = expandProperties(m, valueSizeLimit);
|
||||||
|
|
||||||
rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit));
|
rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit));
|
||||||
|
|
||||||
|
@ -162,6 +162,10 @@ public class MessageOpenTypeFactory<M extends Message> {
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Map<String, Object> expandProperties(M m, int valueSizeLimit) {
|
||||||
|
return m.toPropertyMap(valueSizeLimit);
|
||||||
|
}
|
||||||
|
|
||||||
protected String toString(Object value) {
|
protected String toString(Object value) {
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -602,9 +602,14 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getPersistentSize() throws ActiveMQException {
|
public long getPersistentSize() {
|
||||||
|
try {
|
||||||
|
return largeBody.getBodySize();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Persister<Message> getPersister() {
|
public Persister<Message> getPersister() {
|
||||||
|
|
|
@ -834,6 +834,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Object> toPropertyMap(int valueSizeLimit) {
|
public Map<String, Object> toPropertyMap(int valueSizeLimit) {
|
||||||
|
return toPropertyMap(false, valueSizeLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> toPropertyMap(boolean expandPropertyType, int valueSizeLimit) {
|
||||||
|
String extraPropertiesPrefix;
|
||||||
|
String applicationPropertiesPrefix;
|
||||||
|
String annotationPrefix;
|
||||||
|
String propertiesPrefix;
|
||||||
|
if (expandPropertyType) {
|
||||||
|
extraPropertiesPrefix = "extraProperties.";
|
||||||
|
applicationPropertiesPrefix = "applicationProperties.";
|
||||||
|
annotationPrefix = "messageAnnotations.";
|
||||||
|
propertiesPrefix = "properties.";
|
||||||
|
} else {
|
||||||
|
extraPropertiesPrefix = "";
|
||||||
|
applicationPropertiesPrefix = "";
|
||||||
|
annotationPrefix = "";
|
||||||
|
propertiesPrefix = "";
|
||||||
|
}
|
||||||
Map map = new HashMap<>();
|
Map map = new HashMap<>();
|
||||||
for (SimpleString name : getPropertyNames()) {
|
for (SimpleString name : getPropertyNames()) {
|
||||||
Object value = getObjectProperty(name.toString());
|
Object value = getObjectProperty(name.toString());
|
||||||
|
@ -842,45 +861,45 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
value = ((Binary)value).getArray();
|
value = ((Binary)value).getArray();
|
||||||
}
|
}
|
||||||
value = JsonUtil.truncate(value, valueSizeLimit);
|
value = JsonUtil.truncate(value, valueSizeLimit);
|
||||||
map.put("applicationProperties." + name, value);
|
map.put(applicationPropertiesPrefix + name, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
TypedProperties extraProperties = getExtraProperties();
|
TypedProperties extraProperties = getExtraProperties();
|
||||||
if (extraProperties != null) {
|
if (extraProperties != null) {
|
||||||
extraProperties.forEach((s, o) -> {
|
extraProperties.forEach((s, o) -> {
|
||||||
map.put("extraProperties." + s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit));
|
map.put(extraPropertiesPrefix + s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
addAnnotationsAsProperties(map, messageAnnotations);
|
addAnnotationsAsProperties(annotationPrefix, map, messageAnnotations);
|
||||||
|
|
||||||
if (properties != null) {
|
if (properties != null) {
|
||||||
if (properties.getContentType() != null) {
|
if (properties.getContentType() != null) {
|
||||||
map.put("properties.contentType", properties.getContentType().toString());
|
map.put(propertiesPrefix + "contentType", properties.getContentType().toString());
|
||||||
}
|
}
|
||||||
if (properties.getContentEncoding() != null) {
|
if (properties.getContentEncoding() != null) {
|
||||||
map.put("properties.contentEncoding", properties.getContentEncoding().toString());
|
map.put(propertiesPrefix + "contentEncoding", properties.getContentEncoding().toString());
|
||||||
}
|
}
|
||||||
if (properties.getGroupId() != null) {
|
if (properties.getGroupId() != null) {
|
||||||
map.put("properties.groupID", properties.getGroupId());
|
map.put(propertiesPrefix + "groupID", properties.getGroupId());
|
||||||
}
|
}
|
||||||
if (properties.getGroupSequence() != null) {
|
if (properties.getGroupSequence() != null) {
|
||||||
map.put("properties.groupSequence", properties.getGroupSequence().intValue());
|
map.put(propertiesPrefix + "groupSequence", properties.getGroupSequence().intValue());
|
||||||
}
|
}
|
||||||
if (properties.getReplyToGroupId() != null) {
|
if (properties.getReplyToGroupId() != null) {
|
||||||
map.put("properties.replyToGroupId", properties.getReplyToGroupId());
|
map.put(propertiesPrefix + "replyToGroupId", properties.getReplyToGroupId());
|
||||||
}
|
}
|
||||||
if (properties.getCreationTime() != null) {
|
if (properties.getCreationTime() != null) {
|
||||||
map.put("properties.creationTime", properties.getCreationTime().getTime());
|
map.put(propertiesPrefix + "creationTime", properties.getCreationTime().getTime());
|
||||||
}
|
}
|
||||||
if (properties.getAbsoluteExpiryTime() != null) {
|
if (properties.getAbsoluteExpiryTime() != null) {
|
||||||
map.put("properties.absoluteExpiryTime", properties.getCreationTime().getTime());
|
map.put(propertiesPrefix + "absoluteExpiryTime", properties.getCreationTime().getTime());
|
||||||
}
|
}
|
||||||
if (properties.getTo() != null) {
|
if (properties.getTo() != null) {
|
||||||
map.put("properties.to", properties.getTo());
|
map.put(propertiesPrefix + "to", properties.getTo());
|
||||||
}
|
}
|
||||||
if (properties.getSubject() != null) {
|
if (properties.getSubject() != null) {
|
||||||
map.put("properties.subject", properties.getSubject());
|
map.put(propertiesPrefix + "subject", properties.getSubject());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -888,21 +907,21 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected static void addAnnotationsAsProperties(Map map, MessageAnnotations annotations) {
|
protected static void addAnnotationsAsProperties(String prefix, Map map, MessageAnnotations annotations) {
|
||||||
if (annotations != null && annotations.getValue() != null) {
|
if (annotations != null && annotations.getValue() != null) {
|
||||||
for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
|
for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
|
||||||
String key = entry.getKey().toString();
|
String key = entry.getKey().toString();
|
||||||
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
|
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
|
||||||
long deliveryTime = ((Number) entry.getValue()).longValue();
|
long deliveryTime = ((Number) entry.getValue()).longValue();
|
||||||
map.put("message-annotation.x-opt-delivery-time", deliveryTime);
|
map.put(prefix + "x-opt-delivery-time", deliveryTime);
|
||||||
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
|
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
|
||||||
long delay = ((Number) entry.getValue()).longValue();
|
long delay = ((Number) entry.getValue()).longValue();
|
||||||
map.put("message-annotation.x-opt-delivery-delay", delay);
|
map.put("x-opt-delivery-delay", delay);
|
||||||
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
|
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
|
||||||
map.put("message-annotation.X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue());
|
map.put("X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue());
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
map.put("message-annotation." + key, entry.getValue());
|
map.put(prefix + key, entry.getValue());
|
||||||
} catch (ActiveMQPropertyConversionException e) {
|
} catch (ActiveMQPropertyConversionException e) {
|
||||||
logger.warn(e.getMessage(), e);
|
logger.warn(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
@ -1843,6 +1862,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Map<String, Object> expandProperties(AMQPMessage m, int valueSizeLimit) {
|
||||||
|
return m.toPropertyMap(true, valueSizeLimit);
|
||||||
|
}
|
||||||
|
|
||||||
private byte getType(AMQPMessage m, Properties properties) {
|
private byte getType(AMQPMessage m, Properties properties) {
|
||||||
if (m.isLargeMessage()) {
|
if (m.isLargeMessage()) {
|
||||||
return DEFAULT_TYPE;
|
return DEFAULT_TYPE;
|
||||||
|
|
|
@ -80,12 +80,22 @@ public class JMXManagementTest extends JMSClientTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
//before commit
|
//before commit
|
||||||
assertEquals(num, queueControl.getDeliveringCount());
|
Wait.assertEquals(num, () -> queueControl.getDeliveringCount());
|
||||||
|
|
||||||
Map<String, Map<String, Object>[]> result = queueControl.listDeliveringMessages();
|
Map<String, Map<String, Object>[]> result = null;
|
||||||
|
Map<String, Object>[] msgMaps = null;
|
||||||
|
// we might need some retry, and Wait.assert won't be as efficient on this case
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
result = queueControl.listDeliveringMessages();
|
||||||
assertEquals(1, result.size());
|
assertEquals(1, result.size());
|
||||||
|
|
||||||
Map<String, Object>[] msgMaps = result.entrySet().iterator().next().getValue();
|
msgMaps = result.entrySet().iterator().next().getValue();
|
||||||
|
if (msgMaps.length == num) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
assertEquals(num, msgMaps.length);
|
assertEquals(num, msgMaps.length);
|
||||||
|
|
||||||
|
|
|
@ -359,7 +359,8 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
|
||||||
assertEquals(1, browseResult.length);
|
assertEquals(1, browseResult.length);
|
||||||
|
|
||||||
if ((boolean) browseResult[0].get("largeMessage")) {
|
if ((boolean) browseResult[0].get("largeMessage")) {
|
||||||
assertTrue(browseResult[0].containsKey("text"));
|
// The AMQPMessage will part the body as text (...Large Message...) while core will parse it differently
|
||||||
|
assertTrue(browseResult[0].containsKey("text") || browseResult[0].containsKey("BodyPreview"));
|
||||||
}
|
}
|
||||||
|
|
||||||
connection = client.createConnection();
|
connection = client.createConnection();
|
||||||
|
|
Loading…
Reference in New Issue