This commit is contained in:
Justin Bertram 2017-03-09 08:12:42 -06:00
commit 78e935b184
14 changed files with 147 additions and 58 deletions

View File

@ -167,7 +167,9 @@ public interface Message {
default SimpleString getDeliveryAnnotationPropertyString(SimpleString property) {
Object obj = getDeliveryAnnotationProperty(property);
if (obj instanceof SimpleString) {
if (obj == null) {
return null;
} else if (obj instanceof SimpleString) {
return (SimpleString)obj;
} else {
return SimpleString.toSimpleString(obj.toString());
@ -232,6 +234,10 @@ public interface Message {
* */
RefCountMessageListener getContext();
default SimpleString getGroupID() {
return null;
}
SimpleString getReplyTo();
Message setReplyTo(SimpleString address);
@ -256,6 +262,15 @@ public interface Message {
*/
long getMessageID();
// used for NO-LOCAL: mainly for AMQP
default Message setConnectionID(String connectionID) {
return this;
}
default String getConnectionID() {
return null;
}
Message setMessageID(long id);
default boolean isLargeMessage() {
@ -503,7 +518,7 @@ public interface Message {
Object getObjectProperty(SimpleString key);
Object removeDeliveryAnnoationProperty(SimpleString key);
Object removeDeliveryAnnotationProperty(SimpleString key);
Object getDeliveryAnnotationProperty(SimpleString key);

View File

@ -104,7 +104,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
/** On core there's no delivery annotation */
@Override
public Object removeDeliveryAnnoationProperty(SimpleString key) {
public Object removeDeliveryAnnotationProperty(SimpleString key) {
return removeProperty(key);
}
@ -194,6 +194,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
@Override
public SimpleString getGroupID() {
return this.getSimpleStringProperty(Message.HDR_GROUP_ID);
}
/**
*
* @param sendBuffer

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
@ -72,6 +73,7 @@ public class AMQPMessage extends RefCountMessage {
private Properties _properties;
private ApplicationProperties applicationProperties;
private long scheduledTime = -1;
private String connectionID;
public AMQPMessage(long messageFormat, byte[] data) {
this.data = Unpooled.wrappedBuffer(data);
@ -128,7 +130,6 @@ public class AMQPMessage extends RefCountMessage {
}
private Map getApplicationPropertiesMap() {
ApplicationProperties appMap = getApplicationProperties();
Map map = null;
@ -158,6 +159,17 @@ public class AMQPMessage extends RefCountMessage {
parsedHeaders = true;
}
}
@Override
public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
this.connectionID = connectionID;
return this;
}
@Override
public String getConnectionID() {
return connectionID;
}
public MessageAnnotations getMessageAnnotations() {
parseHeaders();
@ -222,6 +234,17 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
public SimpleString getGroupID() {
parseHeaders();
if (_properties != null && _properties.getGroupId() != null) {
return SimpleString.toSimpleString(_properties.getGroupId());
} else {
return null;
}
}
@Override
public Long getScheduledDeliveryTime() {
@ -667,12 +690,14 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Object getObjectProperty(String key) {
if (key.equals("JMSType")) {
if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
return getProperties().getSubject();
}
} else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
return getConnectionID();
} else {
return getApplicationPropertiesMap().get(key);
}
}
@Override
public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
@ -686,11 +711,14 @@ public class AMQPMessage extends RefCountMessage {
@Override
public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
if (key.equals("JMSType")) {
if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
return getProperties().getSubject();
}
} else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
return getConnectionID();
} else {
return (String)getApplicationPropertiesMap().get(key);
}
}
@Override
public boolean containsDeliveryAnnotationProperty(SimpleString key) {
@ -702,7 +730,7 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
public Object removeDeliveryAnnoationProperty(SimpleString key) {
public Object removeDeliveryAnnotationProperty(SimpleString key) {
parseHeaders();
if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) {
return null;

View File

@ -39,7 +39,6 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@ -394,10 +393,10 @@ public class AMQPSessionCallback implements SessionCallback {
final Receiver receiver) throws Exception {
try {
// message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
serverSession.send(transaction, message, false, false);
// FIXME Potential race here...
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
@ -421,10 +420,6 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
public String getPubSubPrefix() {
return manager.getPubSubPrefix();
}
public void offerProducerCredit(final String address,
final int credits,
final int threshold,
@ -482,8 +477,6 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
try {

View File

@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
@ -44,6 +43,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResource
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.qpid.proton.amqp.DescribedType;
@ -193,7 +193,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean noLocal = false;
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector.endsWith(noLocalFilter)) {
if (selector.length() > noLocalFilter.length()) {
@ -283,7 +283,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
if (filter != null) {
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector != null) {
selector += " AND " + noLocalFilter;
} else {

View File

@ -55,7 +55,6 @@ import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
@ -374,8 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
}
ConsumerId consumerId = messageSend.getTargetConsumerId();
String userId = messageSend.getUserID();
if (userId != null) {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);

View File

@ -63,7 +63,7 @@ public class OpenwireMessage implements Message {
}
@Override
public Object removeDeliveryAnnoationProperty(SimpleString key) {
public Object removeDeliveryAnnotationProperty(SimpleString key) {
return null;
}

View File

@ -238,8 +238,7 @@ public final class BindingsImpl implements Bindings {
/* This is a special treatment for scaled-down messages involving SnF queues.
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
*/
if (message.containsProperty(Message.HDR_SCALEDOWN_TO_IDS)) {
byte[] ids = (byte[]) message.removeProperty(Message.HDR_SCALEDOWN_TO_IDS);
byte[] ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_SCALEDOWN_TO_IDS);
if (ids != null) {
ByteBuffer buffer = ByteBuffer.wrap(ids);
@ -255,7 +254,6 @@ public final class BindingsImpl implements Bindings {
}
}
}
}
boolean routed = false;
@ -270,10 +268,10 @@ public final class BindingsImpl implements Bindings {
if (!routed) {
// Remove the ids now, in order to avoid double check
byte[] ids = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_IDS);
ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_ROUTE_TO_IDS);
// Fetch the groupId now, in order to avoid double checking
SimpleString groupId = message.getSimpleStringProperty(Message.HDR_GROUP_ID);
SimpleString groupId = message.getGroupID();
if (ids != null) {
routeFromCluster(message, context, ids);

View File

@ -820,8 +820,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
MessageReference reference = MessageReference.Factory.createReference(message, queue);
if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
Long scheduledDeliveryTime = message.getScheduledDeliveryTime();
if (scheduledDeliveryTime != null) {
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
@ -1220,7 +1220,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
AtomicBoolean startedTX) throws Exception {
// Check the DuplicateCache for the Bridge first
Object bridgeDup = message.removeDeliveryAnnoationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
Object bridgeDup = message.removeDeliveryAnnotationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
if (bridgeDup != null) {
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
byte[] bridgeDupBytes = (byte[]) bridgeDup;

View File

@ -131,7 +131,7 @@ public class Redistributor implements Consumer {
public synchronized HandleStatus handle(final MessageReference reference) throws Exception {
if (!active) {
return HandleStatus.BUSY;
} else if (reference.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID) != null) {
} else if (reference.getMessage().getGroupID() != null) {
//we shouldn't redistribute with message groups return NO_MATCH so other messages can be delivered
return HandleStatus.NO_MATCH;
}

View File

@ -2162,7 +2162,7 @@ public class QueueImpl implements Queue {
} else {
try {
// But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
return ref.getMessage().getGroupID();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
return null;

View File

@ -304,7 +304,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public Object removeDeliveryAnnoationProperty(SimpleString key) {
public Object removeDeliveryAnnotationProperty(SimpleString key) {
return null;
}

View File

@ -362,7 +362,7 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
public Object removeDeliveryAnnoationProperty(SimpleString key) {
public Object removeDeliveryAnnotationProperty(SimpleString key) {
return null;
}

View File

@ -203,7 +203,7 @@ public class ConsumerTest extends ActiveMQTestBase {
return;
}
internalSend(true, true);
internalSend(2, 2);
}
@Test
@ -214,7 +214,7 @@ public class ConsumerTest extends ActiveMQTestBase {
return;
}
internalSend(false, false);
internalSend(1, 1);
}
@Test
@ -225,7 +225,7 @@ public class ConsumerTest extends ActiveMQTestBase {
return;
}
internalSend(true, false);
internalSend(2, 1);
}
@Test
@ -236,7 +236,51 @@ public class ConsumerTest extends ActiveMQTestBase {
return;
}
internalSend(false, true);
internalSend(1, 2);
}
@Test
public void testSendOpenWireReceiveAMQP() throws Throwable {
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
internalSend(3, 2);
}
@Test
public void testSendAMQPReceiveOpenWire() throws Throwable {
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
internalSend(2, 3);
}
@Test
public void testOpenWireReceiveCore() throws Throwable {
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
internalSend(3, 1);
}
@Test
public void testCoreReceiveOpenwire() throws Throwable {
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
internalSend(1, 3);
}
@ -254,14 +298,23 @@ public class ConsumerTest extends ActiveMQTestBase {
}
}
public void internalSend(boolean amqpSender, boolean amqpConsumer) throws Throwable {
ConnectionFactory factoryAMQP = new JmsConnectionFactory("amqp://localhost:61616");
ConnectionFactory factoryCore = new ActiveMQConnectionFactory();
private ConnectionFactory createFactory(int protocol) {
switch (protocol) {
case 1: return new ActiveMQConnectionFactory();// core protocol
case 2: return new JmsConnectionFactory("amqp://localhost:61616"); // amqp
case 3: return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // openwire
default: return null;
}
}
Connection connection = (amqpSender ? factoryAMQP : factoryCore).createConnection();
public void internalSend(int protocolSender, int protocolConsumer) throws Throwable {
ConnectionFactory factorySend = createFactory(protocolSender);
ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer);
Connection connection = factorySend.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -323,7 +376,7 @@ public class ConsumerTest extends ActiveMQTestBase {
server.start();
}
connection = (amqpConsumer ? factoryAMQP : factoryCore).createConnection();
connection = factoryConsume.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(QUEUE.toString());