ARTEMIS-4259 JMS consumer + FQQN + selector not working

co-authored with Justin Bertram
This commit is contained in:
Clebert Suconic 2023-06-06 15:42:02 -04:00 committed by clebertsuconic
parent f03b775ac5
commit cea9ff6667
7 changed files with 524 additions and 27 deletions

View File

@ -840,26 +840,15 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new RuntimeException("Subscription name cannot be null for durable topic consumer");
// Non durable sub
queueName = new SimpleString(UUID.randomUUID().toString());
if (!CompositeAddress.isFullyQualified(dest.getAddress())) {
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
if (CompositeAddress.isFullyQualified(dest.getAddress())) {
queueName = createFQQNSubscription(dest, coreFilterString, response);
} else {
if (!response.isExists() || !response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, dest.getSimpleAddress()))) {
if (response.isAutoCreateQueues()) {
try {
createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), null, true, true, response);
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet
}
} else {
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
}
}
queueName = CompositeAddress.extractQueueName(dest.getSimpleAddress());
queueName = new SimpleString(UUID.randomUUID().toString());
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
}
consumer = createClientConsumer(dest, queueName, null);
consumer = createClientConsumer(dest, queueName, coreFilterString);
autoDeleteQueueName = queueName;
} else {
// Durable sub
@ -928,6 +917,38 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
}
// This method is for the actual queue creation on the Multicast queue / subscription
private SimpleString createFQQNSubscription(ActiveMQDestination dest,
SimpleString coreFilterString,
AddressQuery response) throws ActiveMQException, JMSException {
SimpleString queueName;
queueName = CompositeAddress.extractQueueName(dest.getSimpleAddress());
if (!response.isExists() || !response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, dest.getSimpleAddress()))) {
if (response.isAutoCreateQueues()) {
try {
createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), coreFilterString, true, true, response);
return queueName;
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet
// on this case we will switch to the regular verification to validate the coreFilterString
}
} else {
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
}
}
QueueQuery queueQuery = session.queueQuery(queueName);
if (!queueQuery.isExists()) {
throw new InvalidDestinationException("Destination " + queueName + " does not exist");
}
if (coreFilterString != null && queueQuery.getFilterString() != null && !coreFilterString.equals(queueQuery.getFilterString())) {
throw new JMSException(queueName + " filter mismatch [" + coreFilterString + "] is different than existing filter [" + queueQuery.getFilterString() + "]");
}
return queueName;
}
private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();

View File

@ -306,11 +306,15 @@ public class AMQPSessionCallback implements SessionCallback {
}
public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception {
return queueQuery(queueName, routingType, autoCreate, null);
}
public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate, SimpleString filter) throws Exception {
QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName);
if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
try {
serverSession.createQueue(new QueueConfiguration(queueName).setRoutingType(routingType).setAutoCreated(true));
serverSession.createQueue(new QueueConfiguration(queueName).setRoutingType(routingType).setFilterString(filter).setAutoCreated(true));
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
@ -321,7 +325,7 @@ public class AMQPSessionCallback implements SessionCallback {
if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) {
//if routingType is null we bypass the check
if (routingType != null && queueQueryResult.getRoutingType() != routingType) {
throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
throw new IllegalStateException("Incorrect Routing Type for queue " + queueName + ", expecting: " + routingType + " while it had " + queueQueryResult.getRoutingType());
}
}

View File

@ -29,6 +29,7 @@ import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
@ -1083,11 +1084,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
shared = hasCapabilities(SHARED, source);
global = hasCapabilities(GLOBAL, source);
final boolean isFQQN;
//find out if we have an address made up of the address and queue name, if yes then set queue name
if (CompositeAddress.isFullyQualified(source.getAddress())) {
isFQQN = true;
addressToUse = SimpleString.toSimpleString(CompositeAddress.extractAddressName(source.getAddress()));
queueNameToUse = SimpleString.toSimpleString(CompositeAddress.extractQueueName(source.getAddress()));
} else {
isFQQN = false;
addressToUse = SimpleString.toSimpleString(source.getAddress());
}
//check to see if the client has defined how we act
@ -1169,8 +1174,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
supportedFilters.put(filter.getKey(), filter.getValue());
}
queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
SimpleString simpleStringSelector = SimpleString.toSimpleString(selector);
queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST, simpleStringSelector, isFQQN);
//if the address specifies a broker configured queue then we always use this, treat it as a queue
if (queue != null) {
@ -1234,10 +1239,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else {
if (queueNameToUse != null) {
//a queue consumer can receive from a multicast queue if it uses a fully qualified name
//setting routingType to null means do not check the routingType against the Queue's routing type.
routingTypeToUse = null;
SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, null);
SimpleString matchingAnycastQueue;
QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(addressToUse, queueNameToUse), null, false, null);
if (result.isExists()) {
// if the queue exists and we're using FQQN then just ignore the routing-type
routingTypeToUse = null;
}
matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, routingTypeToUse, null, false);
if (matchingAnycastQueue != null) {
queue = matchingAnycastQueue;
} else {
@ -1284,15 +1292,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType, SimpleString filter, boolean matchFilter) throws Exception {
if (queueName != null) {
QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), routingType, true);
QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), routingType, true, filter);
if (!result.isExists()) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
} else {
if (!result.getAddress().equals(address)) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
}
if (matchFilter && filter != null && result.getFilterString() != null && !filter.equals(result.getFilterString())) {
throw new ActiveMQIllegalStateException("Queue: " + queueName + " filter mismatch [" + filter + "] is different than existing filter [" + result.getFilterString() + "]");
}
return sessionSPI.getMatchingQueue(address, queueName, routingType);
}
}

View File

@ -1786,7 +1786,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
private void clearupOperationContext() {
server.getStorageManager().clearContext();
if (server != null && server.getStorageManager() != null) {
server.getStorageManager().clearContext();
}
}
private Transaction lookupTX(TransactionId txID, AMQSession session) throws Exception {

View File

@ -160,7 +160,7 @@ public class AMQConsumer {
if (openwireDestination.isTopic()) {
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.getPriority(), info.isBrowser(), false, -1);
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, CompositeAddress.isFullyQualified(destinationName.toString()) ? selector : null, info.getPriority(), info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
//only advisory topic consumers need this.
((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);

View File

@ -361,6 +361,8 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
@Test
public void testAutoCreateOnReconnect() throws Exception {
Connection connection = cf.createConnection();
runAfter(() -> ((ActiveMQConnectionFactory)cf).close());
runAfter(connection::close);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -0,0 +1,456 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JMSFQQNConsumerTest extends MultiprotocolJMSClientTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Test
public void testFQQNTopicConsumerWithSelectorAMQP() throws Exception {
testFQQNTopicConsumerWithSelector("AMQP", true);
}
@Test
public void testFQQNTopicConsumerWithSelectorOpenWire() throws Exception {
testFQQNTopicConsumerWithSelector("OPENWIRE", false);
}
@Test
public void testFQQNTopicConsumerWithSelectorCore() throws Exception {
testFQQNTopicConsumerWithSelector("CORE", true);
}
private void testFQQNTopicConsumerWithSelector(String protocol, boolean validateFilterChange) throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672");
final String queue = "queue";
final String address = "address";
final String filter = "prop='match'";
try (Connection c = factory.createConnection()) {
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue));
MessageConsumer mc = s.createConsumer(t, filter);
Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue));
Assert.assertEquals(RoutingType.MULTICAST, serverQueue.getRoutingType());
Assert.assertNotNull(serverQueue.getFilter());
Assert.assertEquals(filter, serverQueue.getFilter().getFilterString().toString());
assertEquals(filter, server.locateQueue(queue).getFilter().getFilterString().toString());
MessageProducer producer = s.createProducer(s.createTopic("address"));
Message message = s.createTextMessage("hello");
message.setStringProperty("prop", "match");
producer.send(message);
Assert.assertNotNull(mc.receive(5000));
message = s.createTextMessage("hello");
message.setStringProperty("prop", "nomatch");
producer.send(message);
if (protocol.equals("OPENWIRE")) {
Assert.assertNull(mc.receive(500)); // false negatives in openwire
} else {
Assert.assertNull(mc.receiveNoWait());
}
}
if (validateFilterChange) {
boolean thrownException = false;
try (Connection c = factory.createConnection()) {
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue));
MessageConsumer mc = s.createConsumer(t, "shouldThrowException=true");
} catch (Exception e) {
logger.debug(e.getMessage(), e);
thrownException = true;
}
Assert.assertTrue(thrownException);
// validating the case where I am adding a consumer without a filter
// on this case the consumer will have no filter, but the filter on the queue should take care of things
try (Connection c = factory.createConnection()) {
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue));
MessageConsumer mc = s.createConsumer(t);
Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue));
Wait.assertEquals(1, () -> serverQueue.getConsumers().size());
serverQueue.getConsumers().forEach(serverConsumer -> {
Assert.assertNull(serverConsumer.getFilter());
});
MessageProducer producer = s.createProducer(s.createTopic("address"));
Message message = s.createTextMessage("hello");
message.setStringProperty("prop", "match");
producer.send(message);
Assert.assertNotNull(mc.receive(5000));
message = s.createTextMessage("hello");
message.setStringProperty("prop", "nomatch");
producer.send(message);
if (protocol.equals("OPENWIRE")) {
Assert.assertNull(mc.receive(500)); // false negatives in openwire
} else {
Assert.assertNull(mc.receiveNoWait());
}
}
}
}
@Test
public void testFQQNTopicFilterConsumerOnlyAMQP() throws Exception {
testFQQNTopicFilterConsumerOnly("AMQP");
}
@Test
public void testFQQNTopicFilterConsumerOnlyOPENWIRE() throws Exception {
testFQQNTopicFilterConsumerOnly("OPENWIRE");
}
@Test
public void testFQQNTopicFilterConsumerOnlyCORE() throws Exception {
testFQQNTopicFilterConsumerOnly("CORE");
}
private void testFQQNTopicFilterConsumerOnly(String protocol) throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672");
final String queue = "queue";
final String address = "address";
final String filter = "prop='match'";
// predefining the queue without a filter
// so consumers will filter out messages
server.addAddressInfo(new AddressInfo(address).addRoutingType(RoutingType.MULTICAST));
server.createQueue(new QueueConfiguration().setAddress(address).setName(queue).setRoutingType(RoutingType.MULTICAST));
try (Connection c = factory.createConnection()) {
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue));
MessageConsumer mc = s.createConsumer(t, filter);
Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue));
Assert.assertEquals(RoutingType.MULTICAST, serverQueue.getRoutingType());
Assert.assertNull(serverQueue.getFilter()); // it was pre-created without a filter, so we will just filter on the consumer
Wait.assertEquals(1, () -> serverQueue.getConsumers().size());
serverQueue.getConsumers().forEach(consumer -> {
Assert.assertNotNull(consumer.getFilter());
Assert.assertEquals(filter, consumer.getFilter().getFilterString().toString());
});
MessageProducer producer = s.createProducer(s.createTopic("address"));
Message message = s.createTextMessage("hello");
message.setStringProperty("prop", "match");
producer.send(message);
Assert.assertNotNull(mc.receive(5000));
message = s.createTextMessage("hello");
message.setStringProperty("prop", "nomatch");
producer.send(message);
if (protocol.equals("OPENWIRE")) {
assertNull(mc.receive(100)); // i have had false negatives with openwire, hence this
} else {
assertNull(mc.receiveNoWait());
}
}
}
@Test
public void testFQQNTopicConsumerDontExistAMQP() throws Exception {
testFQQNTopicConsumerDontExist("AMQP");
}
/* this commented out code is just to make a point that this test would not be valid in openwire.
As openwire is calling the method createSubscription from its 1.1 implementation.
Hence there's no need to test this over JMS1.1 with openWire
@Test
public void testFQQNTopicConsumerDontExistOPENWIRE() throws Exception {
testFQQNTopicConsumerDontExist("OPENWIRE");
} */
@Test
public void testFQQNTopicConsumerDontExistCORE() throws Exception {
testFQQNTopicConsumerDontExist("CORE");
}
private void testFQQNTopicConsumerDontExist(String protocol) throws Exception {
AddressSettings settings = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().clear();
server.getAddressSettingsRepository().addMatch("#", settings);
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672");
final String queue = "queue";
final String address = "address";
boolean thrownException = false;
try (Connection c = factory.createConnection()) {
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue));
MessageConsumer mc = s.createConsumer(t);
} catch (Exception e) {
logger.debug(e.getMessage(), e);
thrownException = true;
}
Assert.assertTrue(thrownException);
}
@Test
public void testFQQNQueueConsumerWithSelectorAMQP() throws Exception {
testFQQNQueueConsumerWithSelector("AMQP");
}
@Test
public void testFQQNQueueConsumerWithSelectorOpenWire() throws Exception {
testFQQNQueueConsumerWithSelector("OPENWIRE");
}
@Test
public void testFQQNQueueConsumerWithSelectorCore() throws Exception {
testFQQNQueueConsumerWithSelector("CORE");
}
private void testFQQNQueueConsumerWithSelector(String protocol) throws Exception {
AddressSettings settings = new AddressSettings().setDefaultQueueRoutingType(RoutingType.ANYCAST).setDefaultAddressRoutingType(RoutingType.ANYCAST);
server.getAddressSettingsRepository().addMatch("#", settings);
final String queue = "myQueue";
final String address = "address";
final String filter = "prop='match'";
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672");
try (Connection c = factory.createConnection()) {
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueQuery = CompositeAddress.toFullyQualified(address, queue) + (protocol.equals("OPENWIRE") ? "?selectorAware=true" : "");
Queue q = s.createQueue(queueQuery);
MessageConsumer mc = s.createConsumer(q, filter);
Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue));
Assert.assertEquals(RoutingType.ANYCAST, serverQueue.getRoutingType());
Assert.assertNull(serverQueue.getFilter());
MessageProducer p = s.createProducer(q);
Message m = s.createMessage();
m.setStringProperty("prop", "match");
p.send(m);
assertNotNull(mc.receive(1000));
m = s.createMessage();
m.setStringProperty("prop", "no-match");
p.send(m);
if (protocol.equals("OPENWIRE")) {
assertNull(mc.receive(100)); // i have had false negatives with openwire, hence this
} else {
assertNull(mc.receiveNoWait());
}
Wait.assertEquals(1, () -> serverQueue.getConsumers().size());
serverQueue.getConsumers().forEach(queueConsumer -> {
Assert.assertNotNull(queueConsumer.getFilter());
Assert.assertEquals(filter, queueConsumer.getFilter().getFilterString().toString());
});
mc.close();
Wait.assertEquals(0, () -> serverQueue.getConsumers().size());
String invalidFilter = "notHappening=true";
mc = s.createConsumer(q, invalidFilter);
Wait.assertEquals(1, () -> serverQueue.getConsumers().size());
serverQueue.getConsumers().forEach(queueConsumer -> {
Assert.assertNotNull(queueConsumer.getFilter());
Assert.assertEquals(invalidFilter, queueConsumer.getFilter().getFilterString().toString());
});
}
}
@Test
public void testFQQNTopicMultiConsumerWithSelectorAMQP() throws Exception {
testFQQNTopicMultiConsumerWithSelector("AMQP", true);
}
@Test
public void testFQQNTopicMultiConsumerWithSelectorOpenWire() throws Exception {
testFQQNTopicMultiConsumerWithSelector("OPENWIRE", false);
}
@Test
public void testFQQNTopicMultiConsumerWithSelectorCORE() throws Exception {
testFQQNTopicMultiConsumerWithSelector("CORE", true);
}
private void testFQQNTopicMultiConsumerWithSelector(String protocol, boolean validateFilterChange) throws Exception {
class RunnableConsumer implements Runnable {
int errors = 0;
final int expected;
final Connection c;
final Session session;
final Topic topic;
final MessageConsumer consumer;
final String queueName;
final String filter;
final CountDownLatch done;
RunnableConsumer(Connection c, String queueName, int expected, String filter, CountDownLatch done) throws Exception {
this.c = c;
this.session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.queueName = queueName;
this.expected = expected;
this.topic = session.createTopic(queueName);
this.consumer = session.createConsumer(topic, filter);
this.done = done;
this.filter = filter;
}
@Override
public void run() {
try {
for (int i = 0; i < expected; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
logger.debug("Queue {} received message {}", queueName, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
Assert.assertNotNull(message);
}
if (protocol.equals("OPENWIRE")) {
Assert.assertNull(consumer.receive(500)); // false negatives in openwire
} else {
Assert.assertNull(consumer.receiveNoWait());
}
} catch (Throwable e) {
errors++;
logger.warn(e.getMessage(), e);
} finally {
done.countDown();
}
}
}
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672");
final String address = "address";
int threads = 10;
ExecutorService executor = Executors.newFixedThreadPool(threads);
runAfter(executor::shutdownNow);
try (Connection c = factory.createConnection()) {
c.start();
CountDownLatch doneLatch = new CountDownLatch(threads);
RunnableConsumer[] consumers = new RunnableConsumer[threads];
for (int i = 0; i < threads; i++) {
consumers[i] = new RunnableConsumer(c, address + "::" + "queue" + i, i * 10, "i < " + (i * 10), doneLatch);
}
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = s.createProducer(s.createTopic(address));
for (int i = 0; i < threads * 10; i++) {
Message message = s.createTextMessage("i=" + i);
message.setIntProperty("i", i);
p.send(message);
}
for (RunnableConsumer consumer : consumers) {
executor.execute(consumer);
}
Assert.assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
for (RunnableConsumer consumer : consumers) {
Assert.assertEquals("Error on consumer for queue " + consumer.queueName, 0, consumer.errors);
}
}
}
}