This commit is contained in:
Clebert Suconic 2019-09-11 09:41:38 -04:00
commit 02d3384b87
2 changed files with 16 additions and 18 deletions

View File

@ -2745,9 +2745,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.trace("Reference " + ref + " being expired"); logger.trace("Reference " + ref + " being expired");
} }
removeMessageReference(holder, ref); removeMessageReference(holder, ref);
handled++; handled++;
consumers.reset(); consumers.reset();
continue; continue;
@ -2778,8 +2775,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deliveriesInTransit.countUp(); deliveriesInTransit.countUp();
if (!nonDestructive) {
removeMessageReference(holder, ref); removeMessageReference(holder, ref);
}
ref.setInDelivery(true); ref.setInDelivery(true);
handledconsumer = consumer; handledconsumer = consumer;
handled++; handled++;
@ -2836,10 +2834,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) { protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
if (!nonDestructive) { holder.iter.remove();
holder.iter.remove(); refRemoved(ref);
refRemoved(ref);
}
} }
private void checkDepage() { private void checkDepage() {

View File

@ -53,14 +53,16 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
private ConnectionSupplier CoreConnection = () -> createCoreConnection(); private ConnectionSupplier CoreConnection = () -> createCoreConnection();
protected final boolean persistenceEnabled; protected final boolean persistenceEnabled;
protected final long scanPeriod;
public JMSNonDestructiveTest(boolean persistenceEnabled) { public JMSNonDestructiveTest(boolean persistenceEnabled, long scanPeriod) {
this.persistenceEnabled = persistenceEnabled; this.persistenceEnabled = persistenceEnabled;
this.scanPeriod = scanPeriod;
} }
@Parameterized.Parameters(name = "persistenceEnabled={0}") @Parameterized.Parameters(name = "persistenceEnabled={0}, scanPeriod={1}")
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{false}, {true}}; Object[][] params = new Object[][]{{false, 100}, {true, 100}, {true, -1}};
return Arrays.asList(params); return Arrays.asList(params);
} }
@ -72,7 +74,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
@Override @Override
protected void addConfiguration(ActiveMQServer server) { protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setPersistenceEnabled(persistenceEnabled); server.getConfiguration().setPersistenceEnabled(persistenceEnabled);
server.getConfiguration().setMessageExpiryScanPeriod(100); server.getConfiguration().setMessageExpiryScanPeriod(scanPeriod);
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true)); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true));
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L)); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true)); server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true));
@ -330,7 +332,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
assertNotNull(msg); assertNotNull(msg);
assertEquals(Integer.toString(j), msg.getText()); assertEquals(Integer.toString(j), msg.getText());
} }
TextMessage msg = (TextMessage) consumer.receive(200); TextMessage msg = (TextMessage) consumer.receiveNoWait();
assertNull(msg); assertNull(msg);
consumer.close(); consumer.close();
} }
@ -354,7 +356,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = consumerSession.createQueue(queueName); Queue consumerQueue = consumerSession.createQueue(queueName);
MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
TextMessage msg = (TextMessage) consumer.receive(2000); TextMessage msg = (TextMessage) consumer.receiveNoWait();
assertNull(msg); assertNull(msg);
consumer.close(); consumer.close();
} }
@ -393,9 +395,9 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
assertEquals(Integer.toString(j), msg.getText()); assertEquals(Integer.toString(j), msg.getText());
assertEquals(Integer.toString(j), msg2.getText()); assertEquals(Integer.toString(j), msg2.getText());
} }
TextMessage msg = (TextMessage) consumer.receive(200); TextMessage msg = (TextMessage) consumer.receiveNoWait();
assertNull(msg); assertNull(msg);
TextMessage msg2 = (TextMessage) consumer2.receive(200); TextMessage msg2 = (TextMessage) consumer2.receiveNoWait();
assertNull(msg2); assertNull(msg2);
consumer.close(); consumer.close();
consumer2.close(); consumer2.close();
@ -466,7 +468,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
try (Connection connection = consumerConnectionSupplier.createConnection(); try (Connection connection = consumerConnectionSupplier.createConnection();
Session session = connection.createSession(); Session session = connection.createSession();
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName))) { MessageConsumer consumer = session.createConsumer(session.createQueue(queueName))) {
TextMessage msg = (TextMessage) consumer.receive(1000); TextMessage msg = (TextMessage) consumer.receiveNoWait();
assertNull(msg); assertNull(msg);
} }
} }