ARTEMIS-3986 CME when using LVQ

The map used by LastValueQueue was inadvertently changed to a
non-thread-safe implementation in
4a4765c39c. This resulted in an occasional
ConcurrentModificationException from the hashCode implementation.

This commit restores the thread-safe map implementation and adds a test
which brute-forces a CME when using the non-thread-safe implementation.
This commit is contained in:
Justin Bertram 2022-09-14 22:39:34 -05:00
parent c9f01cec3c
commit 27008758fe
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
2 changed files with 73 additions and 2 deletions

View File

@ -17,9 +17,9 @@
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -54,7 +54,7 @@ import org.jboss.logging.Logger;
public class LastValueQueue extends QueueImpl { public class LastValueQueue extends QueueImpl {
private static final Logger logger = Logger.getLogger(LastValueQueue.class); private static final Logger logger = Logger.getLogger(LastValueQueue.class);
private final Map<SimpleString, MessageReference> map = new HashMap<>(); private final Map<SimpleString, MessageReference> map = new ConcurrentHashMap<>();
private final SimpleString lastValueKey; private final SimpleString lastValueKey;

View File

@ -16,6 +16,10 @@
*/ */
package org.apache.activemq.artemis.tests.integration.server; package org.apache.activemq.artemis.tests.integration.server;
import java.util.ConcurrentModificationException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -33,6 +37,7 @@ import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.RetryMethod; import org.apache.activemq.artemis.utils.RetryMethod;
import org.apache.activemq.artemis.utils.RetryRule; import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
@ -829,6 +834,72 @@ public class LVQTest extends ActiveMQTestBase {
assertEquals(queue.getDeliveringSize(), 0); assertEquals(queue.getDeliveringSize(), 0);
} }
@Test
public void testConcurrency() throws Exception {
AtomicBoolean cme = new AtomicBoolean(false);
AtomicBoolean hash = new AtomicBoolean(true);
Queue lvq = server.locateQueue(qName1);
Thread hashCodeThread = new Thread(() -> {
while (hash.get()) {
try {
int hashCode = lvq.hashCode();
} catch (ConcurrentModificationException e) {
cme.set(true);
return;
}
}
});
hashCodeThread.start();
AtomicBoolean consume = new AtomicBoolean(true);
ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
clientSessionTxReceives.start();
Thread consumerThread = new Thread(() -> {
while (consume.get()) {
try {
ClientMessage m = consumer.receive();
m.acknowledge();
clientSessionTxReceives.commit();
} catch (ActiveMQException e) {
e.printStackTrace();
return;
}
}
});
consumerThread.start();
ClientProducer producer = clientSessionTxSends.createProducer(address);
SimpleString lastValue = RandomUtil.randomSimpleString();
AtomicBoolean produce = new AtomicBoolean(true);
Thread producerThread = new Thread(() -> {
for (int i = 0; !cme.get() && produce.get(); i++) {
ClientMessage m = createTextMessage(clientSession, "m" + i, false);
m.putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValue);
try {
producer.send(m);
clientSessionTxSends.commit();
} catch (ActiveMQException e) {
e.printStackTrace();
return;
}
}
});
producerThread.start();
producerThread.join(5000);
try {
assertFalse(cme.get());
} finally {
produce.set(false);
producerThread.join();
consume.set(false);
consumerThread.join();
hash.set(false);
hashCodeThread.join();
}
}
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {