From 27008758fe7ea1290e51845576ae6f806b567340 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 14 Sep 2022 22:39:34 -0500 Subject: [PATCH] ARTEMIS-3986 CME when using LVQ The map used by LastValueQueue was inadvertently changed to a non-thread-safe implementation in 4a4765c39cb73438ea2199b6e0937566d3556c10. This resulted in an occasional ConcurrentModificationException from the hashCode implementation. This commit restores the thread-safe map implementation and adds a test which brute-forces a CME when using the non-thread-safe implementation. --- .../core/server/impl/LastValueQueue.java | 4 +- .../tests/integration/server/LVQTest.java | 71 +++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 82cc672450..52631c3a4e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -17,9 +17,9 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.QueueConfiguration; @@ -54,7 +54,7 @@ import org.jboss.logging.Logger; public class LastValueQueue extends QueueImpl { private static final Logger logger = Logger.getLogger(LastValueQueue.class); - private final Map map = new HashMap<>(); + private final Map map = new ConcurrentHashMap<>(); private final SimpleString lastValueKey; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java index e9a1049ebb..c12cf08be9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.artemis.tests.integration.server; +import java.util.ConcurrentModificationException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; @@ -33,6 +37,7 @@ import org.apache.activemq.artemis.core.server.impl.LastValueQueue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RetryMethod; import org.apache.activemq.artemis.utils.RetryRule; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; @@ -829,6 +834,72 @@ public class LVQTest extends ActiveMQTestBase { assertEquals(queue.getDeliveringSize(), 0); } + @Test + public void testConcurrency() throws Exception { + AtomicBoolean cme = new AtomicBoolean(false); + + AtomicBoolean hash = new AtomicBoolean(true); + Queue lvq = server.locateQueue(qName1); + Thread hashCodeThread = new Thread(() -> { + while (hash.get()) { + try { + int hashCode = lvq.hashCode(); + } catch (ConcurrentModificationException e) { + cme.set(true); + return; + } + } + }); + hashCodeThread.start(); + + AtomicBoolean consume = new AtomicBoolean(true); + ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1); + clientSessionTxReceives.start(); + Thread consumerThread = new Thread(() -> { + while (consume.get()) { + try { + ClientMessage m = consumer.receive(); + m.acknowledge(); + clientSessionTxReceives.commit(); + } catch (ActiveMQException e) { + e.printStackTrace(); + return; + } + } + }); + consumerThread.start(); + + ClientProducer producer = clientSessionTxSends.createProducer(address); + SimpleString lastValue = RandomUtil.randomSimpleString(); + AtomicBoolean produce = new AtomicBoolean(true); + Thread producerThread = new Thread(() -> { + for (int i = 0; !cme.get() && produce.get(); i++) { + ClientMessage m = createTextMessage(clientSession, "m" + i, false); + m.putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValue); + try { + producer.send(m); + clientSessionTxSends.commit(); + } catch (ActiveMQException e) { + e.printStackTrace(); + return; + } + } + }); + producerThread.start(); + producerThread.join(5000); + + try { + assertFalse(cme.get()); + } finally { + produce.set(false); + producerThread.join(); + consume.set(false); + consumerThread.join(); + hash.set(false); + hashCodeThread.join(); + } + } + @Override @Before public void setUp() throws Exception {