ARTEMIS-3529 Fixing test and tweaks on properties
This commit is contained in:
parent
6bb4aa7a10
commit
54418dfcaf
|
@ -1344,7 +1344,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
|
||||
rc.put(CompositeDataConstants.TYPE, m.getType());
|
||||
if (!m.isLargeMessage()) {
|
||||
ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer();
|
||||
ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
|
||||
byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1];
|
||||
bodyCopy.readBytes(bytes);
|
||||
rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit));
|
||||
|
@ -1370,7 +1370,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
|
||||
} else {
|
||||
SimpleString text = m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
|
||||
SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text.toString(), valueSizeLimit) : "");
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -842,34 +842,45 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
value = ((Binary)value).getArray();
|
||||
}
|
||||
value = JsonUtil.truncate(value, valueSizeLimit);
|
||||
map.put(name.toString(), value);
|
||||
map.put("applicationProperties." + name, value);
|
||||
}
|
||||
|
||||
TypedProperties extraProperties = getExtraProperties();
|
||||
if (extraProperties != null) {
|
||||
extraProperties.forEach((s, o) -> {
|
||||
map.put(s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit));
|
||||
map.put("extraProperties." + s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit));
|
||||
});
|
||||
}
|
||||
if (!isLargeMessage()) {
|
||||
|
||||
addAnnotationsAsProperties(map, messageAnnotations);
|
||||
}
|
||||
|
||||
if (properties != null) {
|
||||
if (properties.getContentType() != null) {
|
||||
map.put("properties.getContentType()", properties.getContentType().toString());
|
||||
map.put("properties.contentType", properties.getContentType().toString());
|
||||
}
|
||||
if (properties.getContentEncoding() != null) {
|
||||
map.put("properties.getContentEncoding()", properties.getContentEncoding().toString());
|
||||
map.put("properties.contentEncoding", properties.getContentEncoding().toString());
|
||||
}
|
||||
if (properties.getGroupId() != null) {
|
||||
map.put("properties.getGroupID()", properties.getGroupId());
|
||||
map.put("properties.groupID", properties.getGroupId());
|
||||
}
|
||||
if (properties.getGroupSequence() != null) {
|
||||
map.put("properties.getGroupSequence()", properties.getGroupSequence().intValue());
|
||||
map.put("properties.groupSequence", properties.getGroupSequence().intValue());
|
||||
}
|
||||
if (properties.getReplyToGroupId() != null) {
|
||||
map.put("properties.getReplyToGroupId()", properties.getReplyToGroupId());
|
||||
map.put("properties.replyToGroupId", properties.getReplyToGroupId());
|
||||
}
|
||||
if (properties.getCreationTime() != null) {
|
||||
map.put("properties.creationTime", properties.getCreationTime().getTime());
|
||||
}
|
||||
if (properties.getAbsoluteExpiryTime() != null) {
|
||||
map.put("properties.absoluteExpiryTime", properties.getCreationTime().getTime());
|
||||
}
|
||||
if (properties.getTo() != null) {
|
||||
map.put("properties.to", properties.getTo());
|
||||
}
|
||||
if (properties.getSubject() != null) {
|
||||
map.put("properties.subject", properties.getSubject());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -883,18 +894,17 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
String key = entry.getKey().toString();
|
||||
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
|
||||
long deliveryTime = ((Number) entry.getValue()).longValue();
|
||||
map.put("annotation x-opt-delivery-time", deliveryTime);
|
||||
map.put("message-annotation.x-opt-delivery-time", deliveryTime);
|
||||
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
|
||||
long delay = ((Number) entry.getValue()).longValue();
|
||||
if (delay > 0) {
|
||||
map.put("annotation x-opt-delivery-delay", System.currentTimeMillis() + delay);
|
||||
}
|
||||
map.put("message-annotation.x-opt-delivery-delay", delay);
|
||||
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
|
||||
map.put("annotation X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue());
|
||||
map.put("message-annotation.X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue());
|
||||
} else {
|
||||
try {
|
||||
map.put("annotation " + key, entry.getValue());
|
||||
map.put("message-annotation." + key, entry.getValue());
|
||||
} catch (ActiveMQPropertyConversionException e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1807,12 +1817,12 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
|
||||
@Override
|
||||
public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
|
||||
|
||||
if (!m.isLargeMessage()) {
|
||||
m.ensureScanning();
|
||||
}
|
||||
|
||||
Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
|
||||
|
||||
Properties properties = m.getCurrentProperties();
|
||||
|
||||
byte type = getType(m, properties);
|
||||
|
@ -1822,12 +1832,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
if (m.isLargeMessage()) {
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
|
||||
} else {
|
||||
if (m.getBody() instanceof AmqpValue) {
|
||||
Object amqpValue = ((AmqpValue) m.getBody()).getValue();
|
||||
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
|
||||
Object amqpValue;
|
||||
if (m.getBody() instanceof AmqpValue && (amqpValue = ((AmqpValue) m.getBody()).getValue()) != null) {
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(String.valueOf(amqpValue), valueSizeLimit));
|
||||
} else {
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(String.valueOf(m.getBody()), valueSizeLimit));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1840,10 +1849,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
}
|
||||
byte type = BYTES_TYPE;
|
||||
|
||||
if (m.getBody() == null) {
|
||||
return DEFAULT_TYPE;
|
||||
}
|
||||
|
||||
final Symbol contentType = properties != null ? properties.getContentType() : null;
|
||||
final String contentTypeString = contentType != null ? contentType.toString() : null;
|
||||
|
||||
if (m.getBody() instanceof Data) {
|
||||
if (m.getBody() instanceof Data && contentType != null) {
|
||||
|
||||
if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
|
||||
type = OBJECT_TYPE;
|
||||
|
|
|
@ -359,7 +359,7 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
|
|||
assertEquals(1, browseResult.length);
|
||||
|
||||
if ((boolean) browseResult[0].get("largeMessage")) {
|
||||
assertTrue(browseResult[0].containsKey("BodyPreview"));
|
||||
assertTrue(browseResult[0].containsKey("text"));
|
||||
}
|
||||
|
||||
connection = client.createConnection();
|
||||
|
|
Loading…
Reference in New Issue