This commit is contained in:
Clebert Suconic 2019-10-21 20:46:13 -04:00
commit 6a14d424c1
2 changed files with 60 additions and 5 deletions

View File

@ -182,15 +182,13 @@ public class LastValueQueue extends QueueImpl {
@Override
protected void refRemoved(MessageReference ref) {
if (isNonDestructive()) {
removeIfCurrent(ref);
}
removeIfCurrent(ref);
super.refRemoved(ref);
}
@Override
public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
if (reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
removeIfCurrent(ref);
}
super.acknowledge(ref, reason, consumer);
@ -201,7 +199,7 @@ public class LastValueQueue extends QueueImpl {
MessageReference ref,
AckReason reason,
ServerConsumer consumer) throws Exception {
if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
if (reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
removeIfCurrent(ref);
}
super.acknowledge(tx, ref, reason, consumer);

View File

@ -26,10 +26,13 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -691,6 +694,60 @@ public class LVQTest extends ActiveMQTestBase {
assertTrue(queue.getPersistentSize() > 10 * 1024);
}
@Test
public void testDeleteReference() throws Exception {
ClientProducer producer = clientSession.createProducer(address);
ClientMessage m1 = createTextMessage(clientSession, "m1");
SimpleString rh = new SimpleString("SMID1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
m1.setBodyInputStream(createFakeLargeStream(2 * 1024));
ClientMessage m2 = clientSession.createMessage(true);
m2.setBodyInputStream(createFakeLargeStream(10 * 1024));
m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
Queue queue = server.locateQueue(qName1);
producer.send(m1);
LinkedListIterator<MessageReference> browserIterator = queue.browserIterator();
// Wait for message delivered to queue
Wait.assertTrue(() -> browserIterator.hasNext(), 10, 2);
long messageId = browserIterator.next().getMessage().getMessageID();
browserIterator.close();
queue.deleteReference(messageId);
// Wait for delete tx's afterCommit called
Wait.assertEquals(0L, () -> queue.getDeliveringSize(), 10, 2);
assertEquals(queue.getPersistentSize(), 0);
assertTrue(((LastValueQueue)queue).getLastValueKeys().isEmpty());
producer.send(m2);
// Wait for message delivered to queue
Wait.assertTrue(() -> queue.getPersistentSize() > 10 * 1024, 10, 2);
assertEquals(queue.getDeliveringSize(), 0);
}
@Test
public void testChangeReferencePriority() throws Exception {
ClientProducer producer = clientSession.createProducer(address);
ClientMessage m1 = createTextMessage(clientSession, "m1");
SimpleString rh = new SimpleString("SMID1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
Queue queue = server.locateQueue(qName1);
producer.send(m1);
LinkedListIterator<MessageReference> browserIterator = queue.browserIterator();
// Wait for message delivered to queue
Wait.assertTrue(() -> browserIterator.hasNext(), 10, 2);
long messageId = browserIterator.next().getMessage().getMessageID();
browserIterator.close();
long oldSize = queue.getPersistentSize();
assertTrue(queue.changeReferencePriority(messageId, (byte) 1));
// Wait for message delivered to queue
Wait.assertEquals(oldSize, () -> queue.getPersistentSize(), 10, 2);
assertEquals(queue.getDeliveringSize(), 0);
}
@Override
@Before
public void setUp() throws Exception {