This commit is contained in:
Clebert Suconic 2021-09-10 18:27:00 -04:00
commit 354fa5bf15
8 changed files with 258 additions and 52 deletions

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
@ -354,18 +356,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
private void performAckOnPage(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation) {
if (targetQueue.getPagingStore().isPaging()) {
PageAck pageAck = new PageAck(nodeID, messageID, ackMessageOperation);
targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, pageAck);
targetQueue.getPageSubscription().performScanAck();
} else {
if (logger.isDebugEnabled()) {
logger.debug("Post ack Server " + server + " could not find messageID = " + messageID +
" representing nodeID=" + nodeID);
}
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
}
public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) {
PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation);
targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, pageAck, pageAck);
targetQueue.getPageSubscription().performScanAck();
}
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {
@ -487,23 +482,44 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
}
// I need a supress warning annotation here
// because errorProne is issuing an error her, however I really intend to compare PageACK against PagedReference
@SuppressWarnings("ComparableType")
class PageAck implements Comparable<PagedReference>, Runnable {
class PageAck implements ToIntFunction<PagedReference>, BooleanSupplier, Runnable {
final Queue targetQueue;
final String nodeID;
final long messageID;
final ACKMessageOperation operation;
final IOCallback operation;
PageAck(String nodeID, long messageID, ACKMessageOperation operation) {
PageAck(Queue targetQueue, String nodeID, long messageID, IOCallback operation) {
this.targetQueue = targetQueue;
this.nodeID = nodeID;
this.messageID = messageID;
this.operation = operation;
}
/**
* Method to retry the ack before a scan
* @return
*/
@Override
public int compareTo(PagedReference reference) {
public boolean getAsBoolean() {
try {
recoverContext();
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (reference == null) {
return false;
} else {
targetQueue.acknowledge(reference);
OperationContextImpl.getContext().executeOnCompletion(operation);
return true;
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
return false;
}
}
@Override
public int applyAsInt(PagedReference reference) {
String refNodeID = referenceNodeStore.getServerID(reference);
long refMessageID = referenceNodeStore.getID(reference);
if (refNodeID == null) {
@ -526,7 +542,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
@Override
public void run() {
operation.connectionRun();
operation.done();
}
}

View File

@ -214,6 +214,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return protonSession;
}
public Map<Session, AMQPSessionContext> getSessions() {
return sessions;
}
public SecurityAuth getSecurityAuth() {
return new LocalSecurity();
}

View File

@ -275,6 +275,10 @@ public class AMQPSessionContext extends ProtonInitializable {
return receivers.size();
}
public Map<Receiver, ProtonAbstractReceiver> getReceivers() {
return receivers;
}
public int getSenderCount() {
return senders.size();
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.paging.cursor;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.Page;
@ -85,11 +88,19 @@ public interface PageSubscription {
// for internal (cursor) classes
void confirmPosition(Transaction tx, PagePosition position) throws Exception;
/**
* Add a scanFunction represented by a ToIntFunction
* the execution will be done when you call {@link #performScanAck()}
* @param retryBeforeScan if this function is called and returns true, the scan for this element will not be called. It would be caller's responsibility to call found.
* @param scanFunction
* @param found
* @param notFound
*/
void addScanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound);
// Add a scan function to be performed. It will be completed when you call performScan
void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound);
// it will schedule a scan on pages for everything that was added through addScanAck
/**
* It will perform all scans added by {@link #addScanAck(BooleanSupplier, ToIntFunction, Runnable, Runnable)}
*/
void performScanAck();

View File

@ -32,6 +32,8 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
@ -117,11 +119,12 @@ public final class PageSubscriptionImpl implements PageSubscription {
private final LinkedList<PageScan> scanList = new LinkedList();
private static class PageScan {
final Comparable<PagedReference> scanFunction;
final BooleanSupplier retryBeforeScan;
final ToIntFunction<PagedReference> scanFunction;
final Runnable found;
final Runnable notfound;
final Runnable notFound;
public Comparable<PagedReference> getScanFunction() {
public ToIntFunction<PagedReference> getScanFunction() {
return scanFunction;
}
@ -130,19 +133,20 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
public Runnable getNotfound() {
return notfound;
return notFound;
}
PageScan(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
PageScan(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
this.retryBeforeScan = retryBeforeScan;
this.scanFunction = scanFunction;
this.found = found;
this.notfound = notfound;
this.notFound = notFound;
}
}
@Override
public void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
PageScan scan = new PageScan(scanFunction, found, notfound);
public void addScanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
PageScan scan = new PageScan(retryBeforeScan, scanFunction, found, notFound);
synchronized (scanList) {
scanList.add(scan);
}
@ -153,7 +157,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
// we should only have a max of 2 scheduled tasks
// one that's might still be currently running, and another one lined up
// no need for more than that
if (scheduledScanCount.incrementAndGet() < 2) {
if (scheduledScanCount.incrementAndGet() <= 2) {
executor.execute(this::actualScanAck);
} else {
scheduledScanCount.decrementAndGet();
@ -171,6 +175,32 @@ public final class PageSubscriptionImpl implements PageSubscription {
scanList.clear();
}
int retriedFound = 0;
for (int i = 0; i < localScanList.length; i++) {
PageScan scanElemen = localScanList[i];
if (scanElemen.retryBeforeScan != null && scanElemen.retryBeforeScan.getAsBoolean()) {
localScanList[i] = null;
retriedFound++;
}
}
if (retriedFound == localScanList.length) {
return;
}
if (!isPaging()) {
// this would mean that between the submit and now the system left paging mode
// at this point we will just return everything as notFound
for (int i = 0; i < localScanList.length; i++) {
PageScan scanElemen = localScanList[i];
if (scanElemen != null && scanElemen.notFound != null) {
scanElemen.notFound.run();
}
}
return;
}
LinkedList<Runnable> afterCommitList = new LinkedList<>();
TransactionImpl tx = new TransactionImpl(store);
tx.addOperation(new TransactionOperationAbstract() {
@ -196,7 +226,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
continue;
}
int result = scanElemen.scanFunction.compareTo(reference);
int result = scanElemen.scanFunction.applyAsInt(reference);
if (result >= 0) {
if (result == 0) {
@ -209,8 +239,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
logger.warn(e.getMessage(), e);
}
} else {
if (scanElemen.notfound != null) {
scanElemen.notfound.run();
if (scanElemen.notFound != null) {
scanElemen.notFound.run();
}
}
localScanList[i] = null;
@ -228,8 +258,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
for (int i = 0; i < localScanList.length; i++) {
if (localScanList[i] != null && localScanList[i].notfound != null) {
localScanList[i].notfound.run();
if (localScanList[i] != null && localScanList[i].notFound != null) {
localScanList[i].notFound.run();
}
localScanList[i] = null;
}
@ -241,6 +271,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
logger.warn(e.getMessage(), e);
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
} finally {
scheduledScanCount.decrementAndGet();
}

View File

@ -640,7 +640,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
server_2.startBrokerConnection(brokerConnectionName);
}
snfreplica = server_2.locateQueue(replica.getMirrorSNF());
Assert.assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF()));
if (pagingTarget) {
assertTrue(queueOnServer1.getPagingStore().isPaging());
@ -852,7 +852,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
int LAST_ID,
int port,
boolean assertNull) throws JMSException {
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port + "?jms.prefetchPolicy.all=0");
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port);
Connection conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@ -26,12 +27,24 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
@ -77,6 +90,7 @@ public class PagedMirrorTest extends ActiveMQTestBase {
String secondConsumeURI = "tcp://localhost:61617";
Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
Wait.waitFor(() -> server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
org.apache.activemq.artemis.core.server.Queue snf1 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf1);
@ -152,6 +166,128 @@ public class PagedMirrorTest extends ActiveMQTestBase {
}
}
@Test
public void testAckWithScan() throws Throwable {
String sendURI = "tcp://localhost:61616";
String consumeURI = "tcp://localhost:61617";
Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
Wait.waitFor(() -> server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
org.apache.activemq.artemis.core.server.Queue snf1 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf1);
org.apache.activemq.artemis.core.server.Queue snf2 = server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf2);
File countJournalLocation = server1.getConfiguration().getJournalLocation();
Assert.assertTrue(countJournalLocation.exists() && countJournalLocation.isDirectory());
String protocol = "amqp";
ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol, sendURI);
String bodyBuffer;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < 1024; i++) {
buffer.append("*");
}
bodyBuffer = buffer.toString();
}
int NUMBER_OF_MESSAGES = 200;
try (Connection sendConnecton = sendCF.createConnection()) {
Session sendSession = sendConnecton.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = sendSession.createQueue("someQueue");
MessageProducer producer = sendSession.createProducer(jmsQueue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = sendSession.createTextMessage(bodyBuffer);
message.setIntProperty("i", i);
producer.send(message);
}
sendSession.commit();
}
Wait.assertEquals(0, snf1::getMessageCount);
Wait.assertEquals(0, snf2::getMessageCount);
org.apache.activemq.artemis.core.server.Queue serverQueue2 = server2.locateQueue("someQueue");
Assert.assertNotNull(serverQueue2);
org.apache.activemq.artemis.core.server.Queue serverQueue1 = server1.locateQueue("someQueue");
Assert.assertNotNull(serverQueue1);
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue2::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue1::getMessageCount);
ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol, consumeURI);
try (Connection connection = consumeCF.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = session.createQueue("someQueue");
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 10; i++) {
Message recMessage = consumer.receive(5000);
Assert.assertNotNull(recMessage);
}
session.commit();
}
Wait.assertEquals(NUMBER_OF_MESSAGES - 10, serverQueue2::getMessageCount);
LinkedList<MessageReference> refs = new LinkedList<>();
serverQueue2.forEach((t) -> refs.add(t));
AMQPMirrorControllerTarget controllerTarget = locateMirrorTarget(server2);
CountDownLatch latch = new CountDownLatch(refs.size());
IOCallback callback = new IOCallback() {
@Override
public void done() {
latch.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
};
for (MessageReference r : refs) {
Long messageID = (Long)r.getMessage().getBrokerProperty(SimpleString.toSimpleString("x-opt-amq-mr-id"));
Object nodeID = r.getMessage().getBrokerProperty(SimpleString.toSimpleString("x-opt-amq-bkr-id"));
// this will force the retry on the queue after a depage happened
controllerTarget.performAckOnPage(nodeID.toString(), messageID, serverQueue2, callback);
}
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
Wait.assertEquals(NUMBER_OF_MESSAGES - 10 - refs.size(), serverQueue2::getMessageCount);
}
protected static AMQPMirrorControllerTarget locateMirrorTarget(ActiveMQServer server) {
ActiveMQServerImpl theServer = (ActiveMQServerImpl) server;
for (RemotingConnection connection : theServer.getRemotingService().getConnections()) {
if (connection instanceof ActiveMQProtonRemotingConnection) {
ActiveMQProtonRemotingConnection protonRC = (ActiveMQProtonRemotingConnection) connection;
for (AMQPSessionContext sessionContext : protonRC.getAmqpConnection().getSessions().values()) {
for (ProtonAbstractReceiver receiver : sessionContext.getReceivers().values()) {
if (receiver instanceof AMQPMirrorControllerTarget) {
return (AMQPMirrorControllerTarget) receiver;
}
}
}
}
}
return null;
}
private int acksCount(File countJournalLocation) throws Exception {
HashMap<Integer, AtomicInteger> countJournal = countJournal(countJournalLocation, 10485760, 2, 2);
AtomicInteger acksCount = countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);

View File

@ -24,6 +24,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -38,6 +39,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
@ -120,24 +122,28 @@ public class PageAckScanTest extends ActiveMQTestBase {
session.commit();
}
queue.forEach((r) -> System.out.println("ref -> " + r.getMessage().getIntProperty("i")));
AtomicInteger errors = new AtomicInteger(0);
ReusableLatch latch = new ReusableLatch(4);
Runnable done = latch::countDown;
Runnable notFound = () -> {
errors.incrementAndGet();
done.run();
};
AtomicInteger retried = new AtomicInteger(0);
PageSubscription subscription = queue.getPageSubscription();
subscription.addScanAck(new CompareI(15), done, notFound);
subscription.addScanAck(new CompareI(11), done, notFound);
subscription.addScanAck(new CompareI(99), done, notFound);
subscription.addScanAck(new CompareI(-30), done, notFound);
System.out.println("Performing scan...");
subscription.addScanAck(() -> false, new CompareI(15), done, notFound);
subscription.addScanAck(() -> false, new CompareI(11), done, notFound);
subscription.addScanAck(() -> false, new CompareI(99), done, notFound);
subscription.addScanAck(() -> false, new CompareI(-30), done, notFound);
subscription.addScanAck(() -> true, new CompareI(333), retried::incrementAndGet, notFound);
subscription.performScanAck();
Assert.assertTrue(latch.await(5, TimeUnit.MINUTES));
Assert.assertEquals(2, errors.get());
Wait.assertEquals(1, retried::get);
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -153,17 +159,14 @@ public class PageAckScanTest extends ActiveMQTestBase {
}
}
// Errorprone would barf at this. it was really intended
@SuppressWarnings("ComparableType")
class CompareI implements Comparable<PagedReference> {
class CompareI implements ToIntFunction<PagedReference> {
final int i;
CompareI(int i) {
this.i = i;
}
@Override
public int compareTo(PagedReference ref) {
System.out.println("Comparing against " + ref.getMessage().getIntProperty("i"));
public int applyAsInt(PagedReference ref) {
return ref.getMessage().getIntProperty("i").intValue() - i;
}
}