diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 67a5cf44c9..fb62eb07f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -1423,10 +1423,11 @@ public final class PageSubscriptionImpl implements PageSubscription { } if (valid) { - match = match(message.getMessage()); - - if (!browsing && !match) { - processACK(message.getPosition()); + if (browsing) { + match = match(message.getMessage()); + } else { + // if not browsing, we will just trust the routing on the queue + match = true; } } else if (!browsing && ignored) { positionIgnored(message.getPosition()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ParseAppMultiThread.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ParseAppMultiThread.java new file mode 100644 index 0000000000..9b364d1a1b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ParseAppMultiThread.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Test; + +public class ParseAppMultiThread { + + + @Test + public void testMultiThreadParsing() throws Exception { + + for (int rep = 0; rep < 500; rep++) { + String randomStr = RandomUtil.randomString(); + HashMap map = new HashMap(); + map.put("color", randomStr); + for (int i = 0; i < 10; i++) { + map.put("stuff" + i, "value" + i); // just filling stuff + } + AMQPStandardMessage originalMessage = AMQPStandardMessage.createMessage(1, 0, SimpleString.toSimpleString("duh"), null, null, null, null, map, null, null); + + + // doing a round trip that would be made through persistence + AMQPMessagePersisterV2 persister = AMQPMessagePersisterV2.getInstance(); + + ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024); + persister.encode(buffer, originalMessage); + buffer.readerIndex(1); + + AMQPStandardMessage amqpStandardMessage = (AMQPStandardMessage) persister.decode(buffer, null, null); + + + if (rep == 0) { + // it is enough to check the first time only + // this is to make sure the message does not have application properties parsed + Field field = AMQPMessage.class.getDeclaredField("applicationProperties"); + field.setAccessible(true); + Assert.assertNull(field.get(amqpStandardMessage)); + } + + + Thread[] threads = new Thread[0]; + CyclicBarrier barrier = threads.length > 0 ? new CyclicBarrier(threads.length) : null; + + AtomicInteger errors = new AtomicInteger(0); + + for (int i = 0; i < threads.length; i++) { + Runnable r = () -> { + try { + barrier.await(); + Assert.assertEquals(randomStr, amqpStandardMessage.getObjectProperty(SimpleString.toSimpleString("color"))); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }; + + threads[i] = new Thread(r); + threads[i].start(); + } + + for (Thread t : threads) { + t.join(); + } + + Assert.assertEquals(randomStr, amqpStandardMessage.getObjectPropertyForFilter(SimpleString.toSimpleString("color"))); + Assert.assertEquals(0, errors.get()); + } + + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpFilterChangePageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpFilterChangePageTest.java new file mode 100644 index 0000000000..dcd2d6dc8d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpFilterChangePageTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.lang.reflect.Field; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.selector.filter.Filterable; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.junit.Assert; +import org.junit.Test; + +public class AmqpFilterChangePageTest extends ActiveMQTestBase { + + ActiveMQServer server; + + @Test + public void testChangingMatching() throws Exception { + Configuration config = createDefaultConfig(true); + + int NUMBER_OF_MESSAGES = 2000; + + server = createServer(true, config, 100 * 1024, 1024 * 1024); + server.start(); + + server.addAddressInfo(new AddressInfo("AD1").addRoutingType(RoutingType.MULTICAST)); + server.createQueue(new QueueConfiguration("Q1").setAddress("AD1").setDurable(true).setFilterString("color='red'")); + + ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616"); + Connection connection = cf.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createTopic("AD1")); + + Queue queue = server.locateQueue("Q1"); + + queue.getPagingStore().startPaging(); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = session.createTextMessage("hello " + i); + message.setStringProperty("color", "red"); + producer.send(message); + if (i % 100 == 0 && i > 0) { + session.commit(); + queue.getPagingStore().forceAnotherPage(); + } + } + session.commit(); + + PageSubscriptionImpl subscription = (PageSubscriptionImpl) queue.getPageSubscription(); + Field subscriptionField = PageSubscriptionImpl.class.getDeclaredField("filter"); + subscriptionField.setAccessible(true); + + // Replacing the filter for that won't work + // The system should still respect the original routing + // This is because if something happened to the message in which parsing did not work + // the routing should still be respected + subscriptionField.set(subscription, new Filter() { + @Override + public boolean match(org.apache.activemq.artemis.api.core.Message message) { + return false; + } + + @Override + public boolean match(Map map) { + return false; + } + + @Override + public boolean match(Filterable filterable) { + return false; + } + + @Override + public SimpleString getFilterString() { + return null; + } + }); + + connection.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue("AD1::Q1")); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + Message message = consumer.receive(5000); + Assert.assertNotNull(message); + } + + session.commit(); + + Assert.assertNull(consumer.receiveNoWait()); + + connection.close(); + + } + +} diff --git a/tests/smoke-tests/src/main/resources/servers/mmfactory/broker.xml b/tests/smoke-tests/src/main/resources/servers/mmfactory/broker.xml index b81420fba2..71e4e32f8f 100644 --- a/tests/smoke-tests/src/main/resources/servers/mmfactory/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/mmfactory/broker.xml @@ -236,6 +236,10 @@ under the License. true + + + true + diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mmfactory/MMSFactoryTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mmfactory/MMSFactoryTest.java index 8756a867e7..95eec1a305 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mmfactory/MMSFactoryTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mmfactory/MMSFactoryTest.java @@ -193,6 +193,8 @@ public class MMSFactoryTest extends SmokeTestBase { consumers[i] = startConsumerProcess(theprotocol, timeForConsumers[i], "MMFactory::MMConsumer", 100, i); } + Process deadConsumer = startConsumerProcess(theprotocol, 0, "MMFactory::MMDeadConsumer", 100, 0); + Process dlqProcess = startConsumerProcess(theprotocol, 0, "DLQ", 100, 1000); AtomicInteger retryNumber = new AtomicInteger(0); @@ -290,6 +292,7 @@ public class MMSFactoryTest extends SmokeTestBase { Thread.sleep(1000); } finally { + deadConsumer.destroyForcibly(); for (Process c : consumers) { c.destroyForcibly(); }