ARTEMIS-2135 Race condition on getProtonMessage / getHeader causing NPE
Test was added at commit 48d8a54135
I did not use cherry-pick from master as this is no longer an issue in master after the refactoring
done at ARTEMIS-2096.
This commit is contained in:
parent
e7d26d8bb6
commit
48e0fc8f42
|
@ -141,7 +141,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
this(0, message);
|
||||
}
|
||||
|
||||
public MessageImpl getProtonMessage() {
|
||||
public synchronized MessageImpl getProtonMessage() {
|
||||
if (protonMessage == null) {
|
||||
protonMessage = (MessageImpl) Message.Factory.create();
|
||||
|
||||
|
|
|
@ -26,6 +26,9 @@ import static org.junit.Assert.fail;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
|
@ -75,6 +78,64 @@ public class AMQPMessageTest {
|
|||
assertEquals("someNiceLocal", decoded.getAddress());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecodeMultiThreaded() throws Exception {
|
||||
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
|
||||
protonMessage.setHeader( new Header());
|
||||
Properties properties = new Properties();
|
||||
properties.setTo("someNiceLocal");
|
||||
protonMessage.setProperties(properties);
|
||||
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
|
||||
protonMessage.getHeader().setDurable(Boolean.TRUE);
|
||||
protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
|
||||
|
||||
final AtomicInteger failures = new AtomicInteger(0);
|
||||
|
||||
|
||||
for (int testTry = 0; testTry < 100; testTry++) {
|
||||
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
|
||||
Thread[] threads = new Thread[100];
|
||||
|
||||
CountDownLatch latchAlign = new CountDownLatch(threads.length);
|
||||
CountDownLatch go = new CountDownLatch(1);
|
||||
|
||||
Runnable run = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
latchAlign.countDown();
|
||||
go.await();
|
||||
|
||||
Assert.assertNotNull(decoded.getHeader());
|
||||
// this is a method used by Core Converter
|
||||
decoded.getProtonMessage();
|
||||
Assert.assertNotNull(decoded.getHeader());
|
||||
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
failures.incrementAndGet();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
threads[i] = new Thread(run);
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
Assert.assertTrue(latchAlign.await(10, TimeUnit.SECONDS));
|
||||
go.countDown();
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.join(5000);
|
||||
Assert.assertFalse(thread.isAlive());
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, failures.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApplicationPropertiesReencode() {
|
||||
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
|
||||
|
|
Loading…
Reference in New Issue