This closes #3424
This commit is contained in:
commit
36b3289571
|
@ -174,10 +174,10 @@ public class LastValueQueue extends QueueImpl {
|
|||
|
||||
map.put(prop, hr);
|
||||
|
||||
super.addTail(hr, direct);
|
||||
super.addTail(hr, isNonDestructive() ? false : direct);
|
||||
}
|
||||
} else {
|
||||
super.addTail(ref, direct);
|
||||
super.addTail(ref, isNonDestructive() ? false : direct);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
|
@ -25,6 +26,10 @@ import javax.jms.Session;
|
|||
import javax.jms.TextMessage;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
|
@ -129,6 +134,16 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
|
|||
testNonDestructive(AMQPConnection, CoreConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonDestructiveLVQWithConsumerFirstCore() throws Exception {
|
||||
testNonDestructiveLVQWithConsumerFirst(CoreConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonDestructiveLVQWithConsumerFirstAMQP() throws Exception {
|
||||
testNonDestructiveLVQWithConsumerFirst(AMQPConnection);
|
||||
}
|
||||
|
||||
public void testNonDestructive(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
|
||||
testNonDestructiveSingle(producerConnectionSupplier, consumerConnectionSupplier);
|
||||
testNonDestructiveDualConsumer(producerConnectionSupplier, consumerConnectionSupplier);
|
||||
|
@ -286,6 +301,63 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
|
|||
assertEquals("Message count after clearing queue via queue control should be 0", 0, queueBinding.getQueue().getMessageCount());
|
||||
}
|
||||
|
||||
public void testNonDestructiveLVQWithConsumerFirst(ConnectionSupplier connectionSupplier) throws Exception {
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
CountDownLatch consumerSetup = new CountDownLatch(1);
|
||||
CountDownLatch consumerComplete = new CountDownLatch(1);
|
||||
|
||||
/*
|
||||
* Create the consumer before any messages are sent and keep it there so that the first message which arrives
|
||||
* on the queue triggers direct delivery. Without the fix in this commit this essentially "poisons" the queue
|
||||
* so that consumers can't get messages later.
|
||||
*/
|
||||
executor.submit(() -> {
|
||||
try (Connection connection = connectionSupplier.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer messageConsumer = session.createConsumer(session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME))) {
|
||||
connection.start();
|
||||
consumerSetup.countDown();
|
||||
BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(5000);
|
||||
assertNotNull(messageReceived);
|
||||
consumerComplete.countDown();
|
||||
} catch (Exception e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
|
||||
consumerComplete.countDown();
|
||||
});
|
||||
|
||||
// wait for the consumer thread to start and get everything setup
|
||||
consumerSetup.await(5, TimeUnit.SECONDS);
|
||||
|
||||
try (Connection connection = connectionSupplier.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
|
||||
MessageProducer producer = session.createProducer(session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME));
|
||||
BytesMessage message = session.createBytesMessage();
|
||||
message.writeUTF("mills " + System.currentTimeMillis());
|
||||
message.setStringProperty("_AMQ_LVQ_NAME", "STOCK_NAME");
|
||||
producer.send(message);
|
||||
|
||||
// wait for the consumer to close then send another message
|
||||
consumerComplete.await(5, TimeUnit.SECONDS);
|
||||
|
||||
message = session.createBytesMessage();
|
||||
message.writeUTF("mills " + System.currentTimeMillis());
|
||||
message.setStringProperty("_AMQ_LVQ_NAME", "STOCK_NAME");
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
try (Connection connection = connectionSupplier.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer messageConsumer = session.createConsumer(session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME))) {
|
||||
connection.start();
|
||||
BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(5000);
|
||||
assertNotNull(messageReceived);
|
||||
}
|
||||
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
public void testNonDestructiveLVQTombstone(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
|
||||
int tombstoneTimeToLive = 500;
|
||||
|
||||
|
|
Loading…
Reference in New Issue