This closes #3841
This commit is contained in:
commit
46e28136c3
|
@ -330,6 +330,8 @@ public interface Message {
|
|||
*/
|
||||
long getMessageID();
|
||||
|
||||
String getProtocolName();
|
||||
|
||||
// used for NO-LOCAL: mainly for AMQP
|
||||
default Message setConnectionID(String connectionID) {
|
||||
return this;
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.activemq.artemis.api.core.Message;
|
|||
import org.apache.activemq.artemis.api.core.RefCountMessage;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
||||
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.core.message.LargeBodyReader;
|
||||
|
@ -121,6 +122,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
|||
this.coreMessageObjectPools = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProtocolName() {
|
||||
return ActiveMQClient.DEFAULT_CORE_PROTOCOL;
|
||||
}
|
||||
|
||||
/** On core there's no delivery annotation */
|
||||
@Override
|
||||
public Object getAnnotation(SimpleString key) {
|
||||
|
|
|
@ -36,6 +36,12 @@ public class MessageInternalImpl implements MessageInternal {
|
|||
|
||||
private CoreMessage message;
|
||||
|
||||
@Override
|
||||
public String getProtocolName() {
|
||||
// should normally not be visible in GUI
|
||||
return getClass().getName();
|
||||
}
|
||||
|
||||
public MessageInternalImpl(ICoreMessage message) {
|
||||
this.message = (CoreMessage) message;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ public interface CompositeDataConstants {
|
|||
|
||||
String ADDRESS = "address";
|
||||
String MESSAGE_ID = "messageID";
|
||||
String PROTOCOL = "protocol";
|
||||
String USER_ID = "userID";
|
||||
String TYPE = "type";
|
||||
String DURABLE = "durable";
|
||||
|
@ -35,6 +36,7 @@ public interface CompositeDataConstants {
|
|||
|
||||
String ADDRESS_DESCRIPTION = "The Address";
|
||||
String MESSAGE_ID_DESCRIPTION = "The message ID";
|
||||
String PROTOCOL_DESCRIPTION = "The message protocol";
|
||||
String USER_ID_DESCRIPTION = "The user ID";
|
||||
String TYPE_DESCRIPTION = "The message type";
|
||||
String DURABLE_DESCRIPTION = "Is the message durable";
|
||||
|
|
|
@ -77,6 +77,7 @@ public class MessageOpenTypeFactory<M extends Message> {
|
|||
addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
|
||||
addItem(CompositeDataConstants.ADDRESS, CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.MESSAGE_ID, CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.PROTOCOL, CompositeDataConstants.PROTOCOL_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.USER_ID, CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING);
|
||||
addItem(CompositeDataConstants.DURABLE, CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN);
|
||||
addItem(CompositeDataConstants.EXPIRATION, CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG);
|
||||
|
@ -123,6 +124,7 @@ public class MessageOpenTypeFactory<M extends Message> {
|
|||
public Map<String, Object> getFields(M m, int valueSizeLimit, int deliveryCount) throws OpenDataException {
|
||||
Map<String, Object> rc = new HashMap<>();
|
||||
rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
|
||||
rc.put(CompositeDataConstants.PROTOCOL, m.getProtocolName());
|
||||
if (m.getUserID() != null) {
|
||||
rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString());
|
||||
} else {
|
||||
|
|
|
@ -260,6 +260,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
this.coreMessageObjectPools = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProtocolName() {
|
||||
return ProtonProtocolManagerFactory.AMQP_PROTOCOL_NAME;
|
||||
}
|
||||
|
||||
public final MessageDataScanningStatus getDataScanningStatus() {
|
||||
return MessageDataScanningStatus.valueOf(messageDataScanned);
|
||||
}
|
||||
|
|
|
@ -33,6 +33,11 @@ import io.netty.buffer.ByteBuf;
|
|||
// TODO: Implement this
|
||||
public class OpenwireMessage implements Message {
|
||||
|
||||
@Override
|
||||
public String getProtocolName() {
|
||||
return OpenWireProtocolManagerFactory.OPENWIRE_PROTOCOL_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsProperty(SimpleString key) {
|
||||
return false;
|
||||
|
|
|
@ -296,6 +296,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
|
||||
class FakeMessage implements Message {
|
||||
|
||||
@Override
|
||||
public String getProtocolName() {
|
||||
// should normally not be visible in GUI
|
||||
return getClass().getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getReplyTo() {
|
||||
return null;
|
||||
|
|
|
@ -338,6 +338,12 @@ public class AcknowledgeTest extends ActiveMQTestBase {
|
|||
|
||||
final long id;
|
||||
|
||||
@Override
|
||||
public String getProtocolName() {
|
||||
// should normally not be visible in GUI
|
||||
return getClass().getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getReplyTo() {
|
||||
return null;
|
||||
|
|
Loading…
Reference in New Issue