ARTEMIS-2135 Race condition on getProtonMessage / getHeader causing NPE

Test was added at commit 48d8a54135

I did not use cherry-pick from master as this is no longer an issue in master after the refactoring
done at ARTEMIS-2096.
This commit is contained in:
Clebert Suconic 2018-10-17 17:05:45 -04:00
parent e7d26d8bb6
commit 48e0fc8f42
2 changed files with 62 additions and 1 deletions

View File

@ -141,7 +141,7 @@ public class AMQPMessage extends RefCountMessage {
this(0, message);
}
public MessageImpl getProtonMessage() {
public synchronized MessageImpl getProtonMessage() {
if (protonMessage == null) {
protonMessage = (MessageImpl) Message.Factory.create();

View File

@ -26,6 +26,9 @@ import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -75,6 +78,64 @@ public class AMQPMessageTest {
assertEquals("someNiceLocal", decoded.getAddress());
}
@Test
public void testDecodeMultiThreaded() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE);
protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
final AtomicInteger failures = new AtomicInteger(0);
for (int testTry = 0; testTry < 100; testTry++) {
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
Thread[] threads = new Thread[100];
CountDownLatch latchAlign = new CountDownLatch(threads.length);
CountDownLatch go = new CountDownLatch(1);
Runnable run = new Runnable() {
@Override
public void run() {
try {
latchAlign.countDown();
go.await();
Assert.assertNotNull(decoded.getHeader());
// this is a method used by Core Converter
decoded.getProtonMessage();
Assert.assertNotNull(decoded.getHeader());
} catch (Throwable e) {
e.printStackTrace();
failures.incrementAndGet();
}
}
};
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(run);
threads[i].start();
}
Assert.assertTrue(latchAlign.await(10, TimeUnit.SECONDS));
go.countDown();
for (Thread thread : threads) {
thread.join(5000);
Assert.assertFalse(thread.isAlive());
}
Assert.assertEquals(0, failures.get());
}
}
@Test
public void testApplicationPropertiesReencode() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();