This commit is contained in:
Clebert Suconic 2019-08-22 11:50:28 -04:00
commit 777eede7ab
4 changed files with 61 additions and 18 deletions

View File

@ -193,6 +193,17 @@ public class LastValueQueue extends QueueImpl {
super.acknowledge(ref, reason, consumer); super.acknowledge(ref, reason, consumer);
} }
@Override
public void acknowledge(Transaction tx,
MessageReference ref,
AckReason reason,
ServerConsumer consumer) throws Exception {
if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
removeIfCurrent(ref);
}
super.acknowledge(tx, ref, reason, consumer);
}
private synchronized void removeIfCurrent(MessageReference ref) { private synchronized void removeIfCurrent(MessageReference ref) {
SimpleString lastValueProp = ref.getLastValueProperty(); SimpleString lastValueProp = ref.getLastValueProperty();
if (lastValueProp != null) { if (lastValueProp != null) {

View File

@ -1593,6 +1593,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception { public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
RefsOperation refsOperation = getRefsOperation(tx, reason); RefsOperation refsOperation = getRefsOperation(tx, reason);
if (nonDestructive && reason == AckReason.NORMAL) {
refsOperation.addOnlyRefAck(ref);
if (logger.isDebugEnabled()) {
logger.debug("acknowledge tx ignored nonDestructive=true and reason=NORMAL");
}
} else {
if (ref.isPaged()) { if (ref.isPaged()) {
pageSubscription.ackTx(tx, (PagedReference) ref); pageSubscription.ackTx(tx, (PagedReference) ref);
@ -1617,6 +1623,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer)); server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
} }
} }
}
@Override @Override
public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception { public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception {
@ -3435,6 +3442,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
QueueImpl queue = (QueueImpl) ref.getQueue(); QueueImpl queue = (QueueImpl) ref.getQueue();
queue.decDelivering(ref); queue.decDelivering(ref);
if (nonDestructive && reason == AckReason.NORMAL) {
return;
}
if (reason == AckReason.EXPIRED) { if (reason == AckReason.EXPIRED) {
messagesExpired.incrementAndGet(); messagesExpired.incrementAndGet();

View File

@ -64,6 +64,10 @@ public class RefsOperation extends TransactionOperationAbstract {
ignoreRedeliveryCheck = true; ignoreRedeliveryCheck = true;
} }
synchronized void addOnlyRefAck(final MessageReference ref) {
refsToAck.add(ref);
}
synchronized void addAck(final MessageReference ref) { synchronized void addAck(final MessageReference ref) {
refsToAck.add(ref); refsToAck.add(ref);
if (ref.isPaged()) { if (ref.isPaged()) {

View File

@ -24,6 +24,9 @@ import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -35,7 +38,10 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.LastValueQueue; import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JMSNonDestructiveTest extends JMSClientTestSupport { public class JMSNonDestructiveTest extends JMSClientTestSupport {
private static final String NON_DESTRUCTIVE_QUEUE_NAME = "NON_DESTRUCTIVE_QUEUE"; private static final String NON_DESTRUCTIVE_QUEUE_NAME = "NON_DESTRUCTIVE_QUEUE";
@ -46,6 +52,18 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
private ConnectionSupplier AMQPConnection = () -> createConnection(); private ConnectionSupplier AMQPConnection = () -> createConnection();
private ConnectionSupplier CoreConnection = () -> createCoreConnection(); private ConnectionSupplier CoreConnection = () -> createCoreConnection();
protected final boolean persistenceEnabled;
public JMSNonDestructiveTest(boolean persistenceEnabled) {
this.persistenceEnabled = persistenceEnabled;
}
@Parameterized.Parameters(name = "persistenceEnabled={0}")
public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{false}, {true}};
return Arrays.asList(params);
}
@Override @Override
protected String getConfiguredProtocols() { protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE"; return "AMQP,OPENWIRE,CORE";
@ -53,7 +71,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
@Override @Override
protected void addConfiguration(ActiveMQServer server) { protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setPersistenceEnabled(false); server.getConfiguration().setPersistenceEnabled(persistenceEnabled);
server.getConfiguration().setMessageExpiryScanPeriod(100); server.getConfiguration().setMessageExpiryScanPeriod(100);
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true)); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true));
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L)); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));