ARTEMIS-780 Update Shared Queue API to use Address model

This commit is contained in:
Martyn Taylor 2016-11-29 14:11:27 +00:00
parent c480351c11
commit 279383a798
7 changed files with 92 additions and 6 deletions

View File

@ -112,6 +112,11 @@ public interface Message {
*/
SimpleString HDR_VALIDATED_USER = new SimpleString("_AMQ_VALIDATED_USER");
/**
* The Routing Type for this message. Ensures that this message is only routed to queues with matching routing type.
*/
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
byte DEFAULT_TYPE = 0;
byte OBJECT_TYPE = 2;

View File

@ -64,7 +64,6 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
return buff.toString();
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(address);

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -491,6 +492,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage();
coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType();
coreMessage.putByteProperty(MessageImpl.HDR_ROUTING_TYPE, routingType);
try {
/**
* Using a completionListener requires wrapping using a {@link CompletionListenerWrapper},

View File

@ -18,12 +18,13 @@ package org.apache.activemq.artemis.core.postoffice.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
public class LocalQueueBinding implements QueueBinding {
@ -117,12 +118,23 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public void route(final ServerMessage message, final RoutingContext context) throws Exception {
queue.route(message, context);
if (isMatchRoutingType(message)) {
queue.route(message, context);
}
}
@Override
public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception {
queue.routeWithAck(message, context);
if (isMatchRoutingType(message)) {
queue.routeWithAck(message, context);
}
}
private boolean isMatchRoutingType(ServerMessage message) {
if (message.containsProperty(MessageInternal.HDR_ROUTING_TYPE)) {
return message.getByteProperty(MessageInternal.HDR_ROUTING_TYPE) == queue.getRoutingType().getType();
}
return true;
}
public boolean isQueueBinding() {

View File

@ -655,6 +655,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception {
RoutingStatus result = RoutingStatus.OK;
// Sanity check
if (message.getRefCount() > 0) {
@ -663,6 +664,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
SimpleString address = message.getAddress();
if (address.toString().equals("testQueue")) {
System.out.println("f");
}
setPagingStore(message);
AtomicBoolean startedTX = new AtomicBoolean(false);

View File

@ -1542,7 +1542,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public void createSharedQueue(final SimpleString address, RoutingType routingType, final SimpleString name, final SimpleString filterString,
public void createSharedQueue(final SimpleString address,
RoutingType routingType,
final SimpleString name,
final SimpleString filterString,
final SimpleString user,
boolean durable) throws Exception {
//force the old contract about address
@ -1558,7 +1561,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, false);
final Queue queue = createQueue(address,
name,
routingType,
filterString,
user,
durable,
!durable,
true,
!durable,
false,
Queue.MAX_CONSUMERS_UNLIMITED,
false,
true);
if (!queue.getAddress().equals(address)) {
throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);

View File

@ -25,19 +25,29 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.tests.message.SimpleJMSMessage;
import org.apache.activemq.artemis.jms.tests.message.SimpleJMSTextMessage;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class MessageProducerTest extends JMSTestCase {
@Test
@ -695,6 +705,42 @@ public class MessageProducerTest extends JMSTestCase {
ProxyAssertSupport.assertTrue(listener.exception instanceof javax.jms.IllegalStateException);
}
@Test
public void testSendToQueueOnlyWhenTopicWithSameAddress() throws Exception {
SimpleString addr = SimpleString.toSimpleString("testAddr");
Set<RoutingType> supportedRoutingTypes = new HashSet<>();
supportedRoutingTypes.add(RoutingType.ANYCAST);
supportedRoutingTypes.add(RoutingType.MULTICAST);
servers.get(0).getActiveMQServer().createAddressInfo(new AddressInfo(addr, supportedRoutingTypes));
servers.get(0).getActiveMQServer().createQueue(addr, RoutingType.ANYCAST, addr, null, false, false);
Connection pconn = createConnection();
pconn.start();
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = ps.createQueue(addr.toString());
Topic topic = ps.createTopic(addr.toString());
MessageConsumer queueConsumer = ps.createConsumer(queue);
MessageConsumer topicConsumer = ps.createConsumer(topic);
MessageProducer queueProducer = ps.createProducer(queue);
queueProducer.send(ps.createMessage());
assertNotNull(queueConsumer.receive(1000));
assertNull(topicConsumer.receive(1000));
MessageProducer topicProducer = ps.createProducer(topic);
topicProducer.send(ps.createMessage());
assertNull(queueConsumer.receive(1000));
assertNotNull(topicConsumer.receive(1000));
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------