ARTEMIS-2478 Expired message not removed in non destructive queue
This commit is contained in:
parent
a8d68d9dd7
commit
e43c5390cf
|
@ -2745,9 +2745,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
logger.trace("Reference " + ref + " being expired");
|
||||
}
|
||||
removeMessageReference(holder, ref);
|
||||
|
||||
|
||||
|
||||
handled++;
|
||||
consumers.reset();
|
||||
continue;
|
||||
|
@ -2778,8 +2775,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
deliveriesInTransit.countUp();
|
||||
|
||||
|
||||
removeMessageReference(holder, ref);
|
||||
if (!nonDestructive) {
|
||||
removeMessageReference(holder, ref);
|
||||
}
|
||||
ref.setInDelivery(true);
|
||||
handledconsumer = consumer;
|
||||
handled++;
|
||||
|
@ -2836,10 +2834,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
|
||||
protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
|
||||
if (!nonDestructive) {
|
||||
holder.iter.remove();
|
||||
refRemoved(ref);
|
||||
}
|
||||
holder.iter.remove();
|
||||
refRemoved(ref);
|
||||
}
|
||||
|
||||
private void checkDepage() {
|
||||
|
|
|
@ -53,14 +53,16 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
|
|||
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
|
||||
|
||||
protected final boolean persistenceEnabled;
|
||||
protected final long scanPeriod;
|
||||
|
||||
public JMSNonDestructiveTest(boolean persistenceEnabled) {
|
||||
public JMSNonDestructiveTest(boolean persistenceEnabled, long scanPeriod) {
|
||||
this.persistenceEnabled = persistenceEnabled;
|
||||
this.scanPeriod = scanPeriod;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "persistenceEnabled={0}")
|
||||
@Parameterized.Parameters(name = "persistenceEnabled={0}, scanPeriod={1}")
|
||||
public static Collection<Object[]> data() {
|
||||
Object[][] params = new Object[][]{{false}, {true}};
|
||||
Object[][] params = new Object[][]{{false, 100}, {true, 100}, {true, -1}};
|
||||
return Arrays.asList(params);
|
||||
}
|
||||
|
||||
|
@ -72,7 +74,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
|
|||
@Override
|
||||
protected void addConfiguration(ActiveMQServer server) {
|
||||
server.getConfiguration().setPersistenceEnabled(persistenceEnabled);
|
||||
server.getConfiguration().setMessageExpiryScanPeriod(100);
|
||||
server.getConfiguration().setMessageExpiryScanPeriod(scanPeriod);
|
||||
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true));
|
||||
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));
|
||||
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true));
|
||||
|
|
Loading…
Reference in New Issue