This closes #1012
This commit is contained in:
commit
db2c711e24
|
@ -746,7 +746,11 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
message.setDurable(durable);
|
||||
message.setTimestamp(System.currentTimeMillis());
|
||||
if (body != null) {
|
||||
message.getBodyBuffer().writeBytes(Base64.decode(body));
|
||||
if (type == Message.TEXT_TYPE) {
|
||||
message.getBodyBuffer().writeNullableSimpleString(new SimpleString(body));
|
||||
} else {
|
||||
message.getBodyBuffer().writeBytes(Base64.decode(body));
|
||||
}
|
||||
}
|
||||
message.setAddress(queue.getAddress());
|
||||
ByteBuffer buffer = ByteBuffer.allocate(8);
|
||||
|
|
|
@ -27,7 +27,8 @@ public interface CompositeDataConstants {
|
|||
String PRIORITY = "priority";
|
||||
String REDELIVERED = "redelivered";
|
||||
String TIMESTAMP = "timestamp";
|
||||
String BODY = "body";
|
||||
String BODY = "BodyPreview";
|
||||
String TEXT_BODY = "text";
|
||||
String PROPERTIES = "PropertiesText";
|
||||
|
||||
String ADDRESS_DESCRIPTION = "The Address";
|
||||
|
|
|
@ -35,17 +35,32 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
|
||||
public final class OpenTypeSupport {
|
||||
|
||||
private static MessageOpenTypeFactory FACTORY = new MessageOpenTypeFactory();
|
||||
private static MessageOpenTypeFactory TEXT_FACTORY = new TextMessageOpenTypeFactory();
|
||||
private static MessageOpenTypeFactory BYTES_FACTORY = new BytesMessageOpenTypeFactory();
|
||||
|
||||
private OpenTypeSupport() {
|
||||
}
|
||||
|
||||
public static CompositeData convert(MessageReference ref) throws OpenDataException {
|
||||
CompositeType ct = FACTORY.getCompositeType();
|
||||
Map<String, Object> fields = FACTORY.getFields(ref);
|
||||
CompositeType ct;
|
||||
|
||||
Map<String, Object> fields;
|
||||
byte type = ref.getMessage().getType();
|
||||
|
||||
switch(type) {
|
||||
case Message.TEXT_TYPE:
|
||||
ct = TEXT_FACTORY.getCompositeType();
|
||||
fields = TEXT_FACTORY.getFields(ref);
|
||||
break;
|
||||
default:
|
||||
ct = BYTES_FACTORY.getCompositeType();
|
||||
fields = BYTES_FACTORY.getFields(ref);
|
||||
break;
|
||||
}
|
||||
return new CompositeDataSupport(ct, fields);
|
||||
}
|
||||
|
||||
|
@ -65,8 +80,6 @@ public final class OpenTypeSupport {
|
|||
protected TabularType floatPropertyTabularType;
|
||||
protected TabularType doublePropertyTabularType;
|
||||
|
||||
protected ArrayType body;
|
||||
|
||||
protected String getTypeName() {
|
||||
return Message.class.getName();
|
||||
}
|
||||
|
@ -103,10 +116,6 @@ public final class OpenTypeSupport {
|
|||
floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT);
|
||||
doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE);
|
||||
|
||||
body = new ArrayType(SimpleType.BYTE, true);
|
||||
|
||||
addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body);
|
||||
|
||||
addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType);
|
||||
addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType);
|
||||
addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
|
||||
|
@ -134,11 +143,6 @@ public final class OpenTypeSupport {
|
|||
rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
|
||||
rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1);
|
||||
|
||||
ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate();
|
||||
byte[] bytes = new byte[bodyCopy.readableBytes()];
|
||||
bodyCopy.readBytes(bytes);
|
||||
rc.put(CompositeDataConstants.BODY, bytes);
|
||||
|
||||
Map<String, Object> propertyMap = m.toPropertyMap();
|
||||
|
||||
rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap);
|
||||
|
@ -246,4 +250,45 @@ public final class OpenTypeSupport {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||
protected ArrayType body;
|
||||
|
||||
@Override
|
||||
protected void init() throws OpenDataException {
|
||||
super.init();
|
||||
body = new ArrayType(SimpleType.BYTE, true);
|
||||
addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(ref);
|
||||
ServerMessage m = ref.getMessage();
|
||||
ActiveMQBuffer bodyCopy = m.getBodyBufferDuplicate();
|
||||
byte[] bytes = new byte[bodyCopy.readableBytes()];
|
||||
bodyCopy.readBytes(bytes);
|
||||
rc.put(CompositeDataConstants.BODY, bytes);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
|
||||
protected SimpleType text;
|
||||
|
||||
@Override
|
||||
protected void init() throws OpenDataException {
|
||||
super.init();
|
||||
addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(ref);
|
||||
ServerMessage m = ref.getMessage();
|
||||
SimpleString text = m.getBodyBuffer().copy().readNullableSimpleString();
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue