This commit is contained in:
Justin Bertram 2022-09-15 11:11:20 -05:00
commit 0c95fff865
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
2 changed files with 73 additions and 2 deletions

View File

@ -17,9 +17,9 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -54,7 +54,7 @@ import org.jboss.logging.Logger;
public class LastValueQueue extends QueueImpl {
private static final Logger logger = Logger.getLogger(LastValueQueue.class);
private final Map<SimpleString, MessageReference> map = new HashMap<>();
private final Map<SimpleString, MessageReference> map = new ConcurrentHashMap<>();
private final SimpleString lastValueKey;

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.server;
import java.util.ConcurrentModificationException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -33,6 +37,7 @@ import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.RetryMethod;
import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
@ -829,6 +834,72 @@ public class LVQTest extends ActiveMQTestBase {
assertEquals(queue.getDeliveringSize(), 0);
}
@Test
public void testConcurrency() throws Exception {
AtomicBoolean cme = new AtomicBoolean(false);
AtomicBoolean hash = new AtomicBoolean(true);
Queue lvq = server.locateQueue(qName1);
Thread hashCodeThread = new Thread(() -> {
while (hash.get()) {
try {
int hashCode = lvq.hashCode();
} catch (ConcurrentModificationException e) {
cme.set(true);
return;
}
}
});
hashCodeThread.start();
AtomicBoolean consume = new AtomicBoolean(true);
ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
clientSessionTxReceives.start();
Thread consumerThread = new Thread(() -> {
while (consume.get()) {
try {
ClientMessage m = consumer.receive();
m.acknowledge();
clientSessionTxReceives.commit();
} catch (ActiveMQException e) {
e.printStackTrace();
return;
}
}
});
consumerThread.start();
ClientProducer producer = clientSessionTxSends.createProducer(address);
SimpleString lastValue = RandomUtil.randomSimpleString();
AtomicBoolean produce = new AtomicBoolean(true);
Thread producerThread = new Thread(() -> {
for (int i = 0; !cme.get() && produce.get(); i++) {
ClientMessage m = createTextMessage(clientSession, "m" + i, false);
m.putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValue);
try {
producer.send(m);
clientSessionTxSends.commit();
} catch (ActiveMQException e) {
e.printStackTrace();
return;
}
}
});
producerThread.start();
producerThread.join(5000);
try {
assertFalse(cme.get());
} finally {
produce.set(false);
producerThread.join();
consume.set(false);
consumerThread.join();
hash.set(false);
hashCodeThread.join();
}
}
@Override
@Before
public void setUp() throws Exception {