ARTEMIS-3310 Paging could lose AMQPmessage if match didn't work for any reason

This commit is contained in:
Clebert Suconic 2021-05-20 09:47:03 -04:00 committed by clebertsuconic
parent 9e9e279d42
commit 3f38be8c08
5 changed files with 237 additions and 4 deletions

View File

@ -1423,10 +1423,11 @@ public final class PageSubscriptionImpl implements PageSubscription {
} }
if (valid) { if (valid) {
match = match(message.getMessage()); if (browsing) {
match = match(message.getMessage());
if (!browsing && !match) { } else {
processACK(message.getPosition()); // if not browsing, we will just trust the routing on the queue
match = true;
} }
} else if (!browsing && ignored) { } else if (!browsing && ignored) {
positionIgnored(message.getPosition()); positionIgnored(message.getPosition());

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
public class ParseAppMultiThread {
@Test
public void testMultiThreadParsing() throws Exception {
for (int rep = 0; rep < 500; rep++) {
String randomStr = RandomUtil.randomString();
HashMap map = new HashMap();
map.put("color", randomStr);
for (int i = 0; i < 10; i++) {
map.put("stuff" + i, "value" + i); // just filling stuff
}
AMQPStandardMessage originalMessage = AMQPStandardMessage.createMessage(1, 0, SimpleString.toSimpleString("duh"), null, null, null, null, map, null, null);
// doing a round trip that would be made through persistence
AMQPMessagePersisterV2 persister = AMQPMessagePersisterV2.getInstance();
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
persister.encode(buffer, originalMessage);
buffer.readerIndex(1);
AMQPStandardMessage amqpStandardMessage = (AMQPStandardMessage) persister.decode(buffer, null, null);
if (rep == 0) {
// it is enough to check the first time only
// this is to make sure the message does not have application properties parsed
Field field = AMQPMessage.class.getDeclaredField("applicationProperties");
field.setAccessible(true);
Assert.assertNull(field.get(amqpStandardMessage));
}
Thread[] threads = new Thread[0];
CyclicBarrier barrier = threads.length > 0 ? new CyclicBarrier(threads.length) : null;
AtomicInteger errors = new AtomicInteger(0);
for (int i = 0; i < threads.length; i++) {
Runnable r = () -> {
try {
barrier.await();
Assert.assertEquals(randomStr, amqpStandardMessage.getObjectProperty(SimpleString.toSimpleString("color")));
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
};
threads[i] = new Thread(r);
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
Assert.assertEquals(randomStr, amqpStandardMessage.getObjectPropertyForFilter(SimpleString.toSimpleString("color")));
Assert.assertEquals(0, errors.get());
}
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.reflect.Field;
import java.util.Map;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.Assert;
import org.junit.Test;
public class AmqpFilterChangePageTest extends ActiveMQTestBase {
ActiveMQServer server;
@Test
public void testChangingMatching() throws Exception {
Configuration config = createDefaultConfig(true);
int NUMBER_OF_MESSAGES = 2000;
server = createServer(true, config, 100 * 1024, 1024 * 1024);
server.start();
server.addAddressInfo(new AddressInfo("AD1").addRoutingType(RoutingType.MULTICAST));
server.createQueue(new QueueConfiguration("Q1").setAddress("AD1").setDurable(true).setFilterString("color='red'"));
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createTopic("AD1"));
Queue queue = server.locateQueue("Q1");
queue.getPagingStore().startPaging();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = session.createTextMessage("hello " + i);
message.setStringProperty("color", "red");
producer.send(message);
if (i % 100 == 0 && i > 0) {
session.commit();
queue.getPagingStore().forceAnotherPage();
}
}
session.commit();
PageSubscriptionImpl subscription = (PageSubscriptionImpl) queue.getPageSubscription();
Field subscriptionField = PageSubscriptionImpl.class.getDeclaredField("filter");
subscriptionField.setAccessible(true);
// Replacing the filter for that won't work
// The system should still respect the original routing
// This is because if something happened to the message in which parsing did not work
// the routing should still be respected
subscriptionField.set(subscription, new Filter() {
@Override
public boolean match(org.apache.activemq.artemis.api.core.Message message) {
return false;
}
@Override
public boolean match(Map<String, String> map) {
return false;
}
@Override
public boolean match(Filterable filterable) {
return false;
}
@Override
public SimpleString getFilterString() {
return null;
}
});
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue("AD1::Q1"));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
}
session.commit();
Assert.assertNull(consumer.receiveNoWait());
connection.close();
}
}

View File

@ -236,6 +236,10 @@ under the License.
<filter string="(color='red' OR color='blue')" /> <filter string="(color='red' OR color='blue')" />
<durable>true</durable> <durable>true</durable>
</queue> </queue>
<queue name="MMDeadConsumer" group-buckets="64" group-rebalance="true">
<filter string="(color='yellow' OR color='white')" />
<durable>true</durable>
</queue>
</multicast> </multicast>
</address> </address>
</addresses> </addresses>

View File

@ -193,6 +193,8 @@ public class MMSFactoryTest extends SmokeTestBase {
consumers[i] = startConsumerProcess(theprotocol, timeForConsumers[i], "MMFactory::MMConsumer", 100, i); consumers[i] = startConsumerProcess(theprotocol, timeForConsumers[i], "MMFactory::MMConsumer", 100, i);
} }
Process deadConsumer = startConsumerProcess(theprotocol, 0, "MMFactory::MMDeadConsumer", 100, 0);
Process dlqProcess = startConsumerProcess(theprotocol, 0, "DLQ", 100, 1000); Process dlqProcess = startConsumerProcess(theprotocol, 0, "DLQ", 100, 1000);
AtomicInteger retryNumber = new AtomicInteger(0); AtomicInteger retryNumber = new AtomicInteger(0);
@ -290,6 +292,7 @@ public class MMSFactoryTest extends SmokeTestBase {
Thread.sleep(1000); Thread.sleep(1000);
} finally { } finally {
deadConsumer.destroyForcibly();
for (Process c : consumers) { for (Process c : consumers) {
c.destroyForcibly(); c.destroyForcibly();
} }