diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 7f59ef7e8a..2ba9676ab4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.protocol.amqp.connect.mirror; import java.util.List; +import java.util.function.BooleanSupplier; +import java.util.function.ToIntFunction; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; @@ -354,18 +356,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } - private void performAckOnPage(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation) { - if (targetQueue.getPagingStore().isPaging()) { - PageAck pageAck = new PageAck(nodeID, messageID, ackMessageOperation); - targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, pageAck); - targetQueue.getPageSubscription().performScanAck(); - } else { - if (logger.isDebugEnabled()) { - logger.debug("Post ack Server " + server + " could not find messageID = " + messageID + - " representing nodeID=" + nodeID); - } - OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation); - } + + public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) { + PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation); + targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, pageAck, pageAck); + targetQueue.getPageSubscription().performScanAck(); } private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) { @@ -487,23 +482,44 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement public void sendMessage(Message message, RoutingContext context, List refs) { } - // I need a supress warning annotation here - // because errorProne is issuing an error her, however I really intend to compare PageACK against PagedReference - @SuppressWarnings("ComparableType") - class PageAck implements Comparable, Runnable { + class PageAck implements ToIntFunction, BooleanSupplier, Runnable { + final Queue targetQueue; final String nodeID; final long messageID; - final ACKMessageOperation operation; + final IOCallback operation; - PageAck(String nodeID, long messageID, ACKMessageOperation operation) { + PageAck(Queue targetQueue, String nodeID, long messageID, IOCallback operation) { + this.targetQueue = targetQueue; this.nodeID = nodeID; this.messageID = messageID; this.operation = operation; } + /** + * Method to retry the ack before a scan + * @return + */ @Override - public int compareTo(PagedReference reference) { + public boolean getAsBoolean() { + try { + recoverContext(); + MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore); + if (reference == null) { + return false; + } else { + targetQueue.acknowledge(reference); + OperationContextImpl.getContext().executeOnCompletion(operation); + return true; + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + return false; + } + } + + @Override + public int applyAsInt(PagedReference reference) { String refNodeID = referenceNodeStore.getServerID(reference); long refMessageID = referenceNodeStore.getID(reference); if (refNodeID == null) { @@ -526,7 +542,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement @Override public void run() { - operation.connectionRun(); + operation.done(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 17d6b8f91d..aca83dea29 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -214,6 +214,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH return protonSession; } + public Map getSessions() { + return sessions; + } + public SecurityAuth getSecurityAuth() { return new LocalSecurity(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index 22626ce804..68bba3047f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -275,6 +275,10 @@ public class AMQPSessionContext extends ProtonInitializable { return receivers.size(); } + public Map getReceivers() { + return receivers; + } + public int getSenderCount() { return senders.size(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index 4d7bd7bca6..8feca3fbd4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.paging.cursor; +import java.util.function.BooleanSupplier; +import java.util.function.ToIntFunction; + import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.impl.Page; @@ -85,11 +88,19 @@ public interface PageSubscription { // for internal (cursor) classes void confirmPosition(Transaction tx, PagePosition position) throws Exception; + /** + * Add a scanFunction represented by a ToIntFunction + * the execution will be done when you call {@link #performScanAck()} + * @param retryBeforeScan if this function is called and returns true, the scan for this element will not be called. It would be caller's responsibility to call found. + * @param scanFunction + * @param found + * @param notFound + */ + void addScanAck(BooleanSupplier retryBeforeScan, ToIntFunction scanFunction, Runnable found, Runnable notFound); - // Add a scan function to be performed. It will be completed when you call performScan - void addScanAck(Comparable scanFunction, Runnable found, Runnable notfound); - - // it will schedule a scan on pages for everything that was added through addScanAck + /** + * It will perform all scans added by {@link #addScanAck(BooleanSupplier, ToIntFunction, Runnable, Runnable)} + */ void performScanAck(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 44877866ea..cb18a46aa2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -32,6 +32,8 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; +import java.util.function.ToIntFunction; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; @@ -117,11 +119,12 @@ public final class PageSubscriptionImpl implements PageSubscription { private final LinkedList scanList = new LinkedList(); private static class PageScan { - final Comparable scanFunction; + final BooleanSupplier retryBeforeScan; + final ToIntFunction scanFunction; final Runnable found; - final Runnable notfound; + final Runnable notFound; - public Comparable getScanFunction() { + public ToIntFunction getScanFunction() { return scanFunction; } @@ -130,19 +133,20 @@ public final class PageSubscriptionImpl implements PageSubscription { } public Runnable getNotfound() { - return notfound; + return notFound; } - PageScan(Comparable scanFunction, Runnable found, Runnable notfound) { + PageScan(BooleanSupplier retryBeforeScan, ToIntFunction scanFunction, Runnable found, Runnable notFound) { + this.retryBeforeScan = retryBeforeScan; this.scanFunction = scanFunction; this.found = found; - this.notfound = notfound; + this.notFound = notFound; } } @Override - public void addScanAck(Comparable scanFunction, Runnable found, Runnable notfound) { - PageScan scan = new PageScan(scanFunction, found, notfound); + public void addScanAck(BooleanSupplier retryBeforeScan, ToIntFunction scanFunction, Runnable found, Runnable notFound) { + PageScan scan = new PageScan(retryBeforeScan, scanFunction, found, notFound); synchronized (scanList) { scanList.add(scan); } @@ -153,7 +157,7 @@ public final class PageSubscriptionImpl implements PageSubscription { // we should only have a max of 2 scheduled tasks // one that's might still be currently running, and another one lined up // no need for more than that - if (scheduledScanCount.incrementAndGet() < 2) { + if (scheduledScanCount.incrementAndGet() <= 2) { executor.execute(this::actualScanAck); } else { scheduledScanCount.decrementAndGet(); @@ -171,6 +175,32 @@ public final class PageSubscriptionImpl implements PageSubscription { scanList.clear(); } + int retriedFound = 0; + for (int i = 0; i < localScanList.length; i++) { + PageScan scanElemen = localScanList[i]; + if (scanElemen.retryBeforeScan != null && scanElemen.retryBeforeScan.getAsBoolean()) { + localScanList[i] = null; + retriedFound++; + } + } + + if (retriedFound == localScanList.length) { + return; + } + + if (!isPaging()) { + // this would mean that between the submit and now the system left paging mode + // at this point we will just return everything as notFound + for (int i = 0; i < localScanList.length; i++) { + PageScan scanElemen = localScanList[i]; + if (scanElemen != null && scanElemen.notFound != null) { + scanElemen.notFound.run(); + } + } + + return; + } + LinkedList afterCommitList = new LinkedList<>(); TransactionImpl tx = new TransactionImpl(store); tx.addOperation(new TransactionOperationAbstract() { @@ -196,7 +226,7 @@ public final class PageSubscriptionImpl implements PageSubscription { continue; } - int result = scanElemen.scanFunction.compareTo(reference); + int result = scanElemen.scanFunction.applyAsInt(reference); if (result >= 0) { if (result == 0) { @@ -209,8 +239,8 @@ public final class PageSubscriptionImpl implements PageSubscription { logger.warn(e.getMessage(), e); } } else { - if (scanElemen.notfound != null) { - scanElemen.notfound.run(); + if (scanElemen.notFound != null) { + scanElemen.notFound.run(); } } localScanList[i] = null; @@ -228,8 +258,8 @@ public final class PageSubscriptionImpl implements PageSubscription { } for (int i = 0; i < localScanList.length; i++) { - if (localScanList[i] != null && localScanList[i].notfound != null) { - localScanList[i].notfound.run(); + if (localScanList[i] != null && localScanList[i].notFound != null) { + localScanList[i].notFound.run(); } localScanList[i] = null; } @@ -241,6 +271,8 @@ public final class PageSubscriptionImpl implements PageSubscription { logger.warn(e.getMessage(), e); } } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); } finally { scheduledScanCount.decrementAndGet(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java index b024235b03..12db46f2a1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java @@ -640,7 +640,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { server_2.startBrokerConnection(brokerConnectionName); } - snfreplica = server_2.locateQueue(replica.getMirrorSNF()); + Assert.assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF())); if (pagingTarget) { assertTrue(queueOnServer1.getPagingStore().isPaging()); @@ -852,7 +852,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { int LAST_ID, int port, boolean assertNull) throws JMSException { - ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port + "?jms.prefetchPolicy.all=0"); + ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port); Connection conn = cf.createConnection(); Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java index c894bf9588..f899dd112d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp.connect; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -26,12 +27,24 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.io.File; import java.util.HashMap; +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; @@ -77,6 +90,7 @@ public class PagedMirrorTest extends ActiveMQTestBase { String secondConsumeURI = "tcp://localhost:61617"; Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null); + Wait.waitFor(() -> server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null); org.apache.activemq.artemis.core.server.Queue snf1 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other"); Assert.assertNotNull(snf1); @@ -152,6 +166,128 @@ public class PagedMirrorTest extends ActiveMQTestBase { } } + + @Test + public void testAckWithScan() throws Throwable { + String sendURI = "tcp://localhost:61616"; + String consumeURI = "tcp://localhost:61617"; + + Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null); + Wait.waitFor(() -> server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null); + + org.apache.activemq.artemis.core.server.Queue snf1 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other"); + Assert.assertNotNull(snf1); + + org.apache.activemq.artemis.core.server.Queue snf2 = server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other"); + Assert.assertNotNull(snf2); + + File countJournalLocation = server1.getConfiguration().getJournalLocation(); + Assert.assertTrue(countJournalLocation.exists() && countJournalLocation.isDirectory()); + String protocol = "amqp"; + + ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol, sendURI); + + String bodyBuffer; + { + StringBuffer buffer = new StringBuffer(); + for (int i = 0; i < 1024; i++) { + buffer.append("*"); + } + bodyBuffer = buffer.toString(); + } + + int NUMBER_OF_MESSAGES = 200; + + try (Connection sendConnecton = sendCF.createConnection()) { + Session sendSession = sendConnecton.createSession(true, Session.SESSION_TRANSACTED); + Queue jmsQueue = sendSession.createQueue("someQueue"); + MessageProducer producer = sendSession.createProducer(jmsQueue); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = sendSession.createTextMessage(bodyBuffer); + message.setIntProperty("i", i); + producer.send(message); + } + sendSession.commit(); + } + + Wait.assertEquals(0, snf1::getMessageCount); + Wait.assertEquals(0, snf2::getMessageCount); + + org.apache.activemq.artemis.core.server.Queue serverQueue2 = server2.locateQueue("someQueue"); + Assert.assertNotNull(serverQueue2); + org.apache.activemq.artemis.core.server.Queue serverQueue1 = server1.locateQueue("someQueue"); + Assert.assertNotNull(serverQueue1); + + Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue2::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue1::getMessageCount); + + + ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol, consumeURI); + try (Connection connection = consumeCF.createConnection()) { + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue jmsQueue = session.createQueue("someQueue"); + MessageConsumer consumer = session.createConsumer(jmsQueue); + for (int i = 0; i < 10; i++) { + Message recMessage = consumer.receive(5000); + Assert.assertNotNull(recMessage); + } + session.commit(); + } + + Wait.assertEquals(NUMBER_OF_MESSAGES - 10, serverQueue2::getMessageCount); + + LinkedList refs = new LinkedList<>(); + serverQueue2.forEach((t) -> refs.add(t)); + + AMQPMirrorControllerTarget controllerTarget = locateMirrorTarget(server2); + + CountDownLatch latch = new CountDownLatch(refs.size()); + + IOCallback callback = new IOCallback() { + @Override + public void done() { + latch.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + } + }; + + for (MessageReference r : refs) { + Long messageID = (Long)r.getMessage().getBrokerProperty(SimpleString.toSimpleString("x-opt-amq-mr-id")); + Object nodeID = r.getMessage().getBrokerProperty(SimpleString.toSimpleString("x-opt-amq-bkr-id")); + + // this will force the retry on the queue after a depage happened + controllerTarget.performAckOnPage(nodeID.toString(), messageID, serverQueue2, callback); + } + + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + + Wait.assertEquals(NUMBER_OF_MESSAGES - 10 - refs.size(), serverQueue2::getMessageCount); + } + + protected static AMQPMirrorControllerTarget locateMirrorTarget(ActiveMQServer server) { + ActiveMQServerImpl theServer = (ActiveMQServerImpl) server; + + for (RemotingConnection connection : theServer.getRemotingService().getConnections()) { + if (connection instanceof ActiveMQProtonRemotingConnection) { + ActiveMQProtonRemotingConnection protonRC = (ActiveMQProtonRemotingConnection) connection; + for (AMQPSessionContext sessionContext : protonRC.getAmqpConnection().getSessions().values()) { + for (ProtonAbstractReceiver receiver : sessionContext.getReceivers().values()) { + if (receiver instanceof AMQPMirrorControllerTarget) { + return (AMQPMirrorControllerTarget) receiver; + } + } + } + } + } + + return null; + } + private int acksCount(File countJournalLocation) throws Exception { HashMap countJournal = countJournal(countJournalLocation, 10485760, 2, 2); AtomicInteger acksCount = countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java index 2b907d9b76..cf36a84f7e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java @@ -24,6 +24,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.ToIntFunction; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -38,6 +39,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.Wait; import org.jboss.logging.Logger; import org.junit.Assert; import org.junit.Before; @@ -120,24 +122,28 @@ public class PageAckScanTest extends ActiveMQTestBase { session.commit(); } - queue.forEach((r) -> System.out.println("ref -> " + r.getMessage().getIntProperty("i"))); - AtomicInteger errors = new AtomicInteger(0); + ReusableLatch latch = new ReusableLatch(4); + Runnable done = latch::countDown; + Runnable notFound = () -> { errors.incrementAndGet(); done.run(); }; + + AtomicInteger retried = new AtomicInteger(0); PageSubscription subscription = queue.getPageSubscription(); - subscription.addScanAck(new CompareI(15), done, notFound); - subscription.addScanAck(new CompareI(11), done, notFound); - subscription.addScanAck(new CompareI(99), done, notFound); - subscription.addScanAck(new CompareI(-30), done, notFound); - System.out.println("Performing scan..."); + subscription.addScanAck(() -> false, new CompareI(15), done, notFound); + subscription.addScanAck(() -> false, new CompareI(11), done, notFound); + subscription.addScanAck(() -> false, new CompareI(99), done, notFound); + subscription.addScanAck(() -> false, new CompareI(-30), done, notFound); + subscription.addScanAck(() -> true, new CompareI(333), retried::incrementAndGet, notFound); subscription.performScanAck(); Assert.assertTrue(latch.await(5, TimeUnit.MINUTES)); Assert.assertEquals(2, errors.get()); + Wait.assertEquals(1, retried::get); try (Connection connection = factory.createConnection()) { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -153,17 +159,14 @@ public class PageAckScanTest extends ActiveMQTestBase { } } - // Errorprone would barf at this. it was really intended - @SuppressWarnings("ComparableType") - class CompareI implements Comparable { + class CompareI implements ToIntFunction { final int i; CompareI(int i) { this.i = i; } @Override - public int compareTo(PagedReference ref) { - System.out.println("Comparing against " + ref.getMessage().getIntProperty("i")); + public int applyAsInt(PagedReference ref) { return ref.getMessage().getIntProperty("i").intValue() - i; } }