ARTEMIS-2459 Fix err in the replacement of a non-destructively consumed LVQ message

This commit is contained in:
Wei Yang 2019-08-21 20:25:24 +08:00 committed by Clebert Suconic
parent f2557c27ba
commit 510339423e
4 changed files with 61 additions and 18 deletions

View File

@ -187,12 +187,23 @@ public class LastValueQueue extends QueueImpl {
@Override
public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED ) {
if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
removeIfCurrent(ref);
}
super.acknowledge(ref, reason, consumer);
}
@Override
public void acknowledge(Transaction tx,
MessageReference ref,
AckReason reason,
ServerConsumer consumer) throws Exception {
if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
removeIfCurrent(ref);
}
super.acknowledge(tx, ref, reason, consumer);
}
private synchronized void removeIfCurrent(MessageReference ref) {
SimpleString lastValueProp = ref.getLastValueProperty();
if (lastValueProp != null) {

View File

@ -1593,28 +1593,35 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
RefsOperation refsOperation = getRefsOperation(tx, reason);
if (ref.isPaged()) {
pageSubscription.ackTx(tx, (PagedReference) ref);
refsOperation.addAck(ref);
if (nonDestructive && reason == AckReason.NORMAL) {
refsOperation.addOnlyRefAck(ref);
if (logger.isDebugEnabled()) {
logger.debug("acknowledge tx ignored nonDestructive=true and reason=NORMAL");
}
} else {
Message message = ref.getMessage();
if (ref.isPaged()) {
pageSubscription.ackTx(tx, (PagedReference) ref);
boolean durableRef = message.isDurable() && isDurable();
refsOperation.addAck(ref);
} else {
Message message = ref.getMessage();
if (durableRef) {
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
boolean durableRef = message.isDurable() && isDurable();
tx.setContainsPersistent();
if (durableRef) {
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
tx.setContainsPersistent();
}
ackAttempts.incrementAndGet();
refsOperation.addAck(ref);
}
ackAttempts.incrementAndGet();
refsOperation.addAck(ref);
}
if (server != null && server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
if (server != null && server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
}
}
}
@ -3435,6 +3442,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
QueueImpl queue = (QueueImpl) ref.getQueue();
queue.decDelivering(ref);
if (nonDestructive && reason == AckReason.NORMAL) {
return;
}
if (reason == AckReason.EXPIRED) {
messagesExpired.incrementAndGet();

View File

@ -64,6 +64,10 @@ public class RefsOperation extends TransactionOperationAbstract {
ignoreRedeliveryCheck = true;
}
synchronized void addOnlyRefAck(final MessageReference ref) {
refsToAck.add(ref);
}
synchronized void addAck(final MessageReference ref) {
refsToAck.add(ref);
if (ref.isPaged()) {

View File

@ -24,6 +24,9 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -35,7 +38,10 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JMSNonDestructiveTest extends JMSClientTestSupport {
private static final String NON_DESTRUCTIVE_QUEUE_NAME = "NON_DESTRUCTIVE_QUEUE";
@ -46,6 +52,18 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
private ConnectionSupplier AMQPConnection = () -> createConnection();
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
protected final boolean persistenceEnabled;
public JMSNonDestructiveTest(boolean persistenceEnabled) {
this.persistenceEnabled = persistenceEnabled;
}
@Parameterized.Parameters(name = "persistenceEnabled={0}")
public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{false}, {true}};
return Arrays.asList(params);
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
@ -53,7 +71,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setPersistenceEnabled(false);
server.getConfiguration().setPersistenceEnabled(persistenceEnabled);
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true));
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));