ARTEMIS-3464 Improving ACK reliability on Paging and code improvements
This commit is contained in:
parent
779d146d35
commit
2fb23474ce
|
@ -17,6 +17,8 @@
|
||||||
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
|
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.function.BooleanSupplier;
|
||||||
|
import java.util.function.ToIntFunction;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||||
|
@ -354,18 +356,11 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void performAckOnPage(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation) {
|
|
||||||
if (targetQueue.getPagingStore().isPaging()) {
|
public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) {
|
||||||
PageAck pageAck = new PageAck(nodeID, messageID, ackMessageOperation);
|
PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation);
|
||||||
targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, pageAck);
|
targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, pageAck, pageAck);
|
||||||
targetQueue.getPageSubscription().performScanAck();
|
targetQueue.getPageSubscription().performScanAck();
|
||||||
} else {
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("Post ack Server " + server + " could not find messageID = " + messageID +
|
|
||||||
" representing nodeID=" + nodeID);
|
|
||||||
}
|
|
||||||
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {
|
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {
|
||||||
|
@ -487,23 +482,44 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
|
||||||
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
|
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// I need a supress warning annotation here
|
class PageAck implements ToIntFunction<PagedReference>, BooleanSupplier, Runnable {
|
||||||
// because errorProne is issuing an error her, however I really intend to compare PageACK against PagedReference
|
|
||||||
@SuppressWarnings("ComparableType")
|
|
||||||
class PageAck implements Comparable<PagedReference>, Runnable {
|
|
||||||
|
|
||||||
|
final Queue targetQueue;
|
||||||
final String nodeID;
|
final String nodeID;
|
||||||
final long messageID;
|
final long messageID;
|
||||||
final ACKMessageOperation operation;
|
final IOCallback operation;
|
||||||
|
|
||||||
PageAck(String nodeID, long messageID, ACKMessageOperation operation) {
|
PageAck(Queue targetQueue, String nodeID, long messageID, IOCallback operation) {
|
||||||
|
this.targetQueue = targetQueue;
|
||||||
this.nodeID = nodeID;
|
this.nodeID = nodeID;
|
||||||
this.messageID = messageID;
|
this.messageID = messageID;
|
||||||
this.operation = operation;
|
this.operation = operation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to retry the ack before a scan
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(PagedReference reference) {
|
public boolean getAsBoolean() {
|
||||||
|
try {
|
||||||
|
recoverContext();
|
||||||
|
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
|
||||||
|
if (reference == null) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
targetQueue.acknowledge(reference);
|
||||||
|
OperationContextImpl.getContext().executeOnCompletion(operation);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int applyAsInt(PagedReference reference) {
|
||||||
String refNodeID = referenceNodeStore.getServerID(reference);
|
String refNodeID = referenceNodeStore.getServerID(reference);
|
||||||
long refMessageID = referenceNodeStore.getID(reference);
|
long refMessageID = referenceNodeStore.getID(reference);
|
||||||
if (refNodeID == null) {
|
if (refNodeID == null) {
|
||||||
|
@ -526,7 +542,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
operation.connectionRun();
|
operation.done();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,6 +214,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
||||||
return protonSession;
|
return protonSession;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<Session, AMQPSessionContext> getSessions() {
|
||||||
|
return sessions;
|
||||||
|
}
|
||||||
|
|
||||||
public SecurityAuth getSecurityAuth() {
|
public SecurityAuth getSecurityAuth() {
|
||||||
return new LocalSecurity();
|
return new LocalSecurity();
|
||||||
}
|
}
|
||||||
|
|
|
@ -275,6 +275,10 @@ public class AMQPSessionContext extends ProtonInitializable {
|
||||||
return receivers.size();
|
return receivers.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<Receiver, ProtonAbstractReceiver> getReceivers() {
|
||||||
|
return receivers;
|
||||||
|
}
|
||||||
|
|
||||||
public int getSenderCount() {
|
public int getSenderCount() {
|
||||||
return senders.size();
|
return senders.size();
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.paging.cursor;
|
package org.apache.activemq.artemis.core.paging.cursor;
|
||||||
|
|
||||||
|
import java.util.function.BooleanSupplier;
|
||||||
|
import java.util.function.ToIntFunction;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
import org.apache.activemq.artemis.core.paging.PagedMessage;
|
||||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||||
import org.apache.activemq.artemis.core.paging.impl.Page;
|
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||||
|
@ -85,11 +88,19 @@ public interface PageSubscription {
|
||||||
// for internal (cursor) classes
|
// for internal (cursor) classes
|
||||||
void confirmPosition(Transaction tx, PagePosition position) throws Exception;
|
void confirmPosition(Transaction tx, PagePosition position) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a scanFunction represented by a ToIntFunction
|
||||||
|
* the execution will be done when you call {@link #performScanAck()}
|
||||||
|
* @param retryBeforeScan if this function is called and returns true, the scan for this element will not be called. It would be caller's responsibility to call found.
|
||||||
|
* @param scanFunction
|
||||||
|
* @param found
|
||||||
|
* @param notFound
|
||||||
|
*/
|
||||||
|
void addScanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound);
|
||||||
|
|
||||||
// Add a scan function to be performed. It will be completed when you call performScan
|
/**
|
||||||
void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound);
|
* It will perform all scans added by {@link #addScanAck(BooleanSupplier, ToIntFunction, Runnable, Runnable)}
|
||||||
|
*/
|
||||||
// it will schedule a scan on pages for everything that was added through addScanAck
|
|
||||||
void performScanAck();
|
void performScanAck();
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,8 @@ import java.util.TreeMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.function.BooleanSupplier;
|
||||||
|
import java.util.function.ToIntFunction;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
@ -117,11 +119,12 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
private final LinkedList<PageScan> scanList = new LinkedList();
|
private final LinkedList<PageScan> scanList = new LinkedList();
|
||||||
|
|
||||||
private static class PageScan {
|
private static class PageScan {
|
||||||
final Comparable<PagedReference> scanFunction;
|
final BooleanSupplier retryBeforeScan;
|
||||||
|
final ToIntFunction<PagedReference> scanFunction;
|
||||||
final Runnable found;
|
final Runnable found;
|
||||||
final Runnable notfound;
|
final Runnable notFound;
|
||||||
|
|
||||||
public Comparable<PagedReference> getScanFunction() {
|
public ToIntFunction<PagedReference> getScanFunction() {
|
||||||
return scanFunction;
|
return scanFunction;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,19 +133,20 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Runnable getNotfound() {
|
public Runnable getNotfound() {
|
||||||
return notfound;
|
return notFound;
|
||||||
}
|
}
|
||||||
|
|
||||||
PageScan(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
|
PageScan(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
|
||||||
|
this.retryBeforeScan = retryBeforeScan;
|
||||||
this.scanFunction = scanFunction;
|
this.scanFunction = scanFunction;
|
||||||
this.found = found;
|
this.found = found;
|
||||||
this.notfound = notfound;
|
this.notFound = notFound;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
|
public void addScanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
|
||||||
PageScan scan = new PageScan(scanFunction, found, notfound);
|
PageScan scan = new PageScan(retryBeforeScan, scanFunction, found, notFound);
|
||||||
synchronized (scanList) {
|
synchronized (scanList) {
|
||||||
scanList.add(scan);
|
scanList.add(scan);
|
||||||
}
|
}
|
||||||
|
@ -153,7 +157,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
// we should only have a max of 2 scheduled tasks
|
// we should only have a max of 2 scheduled tasks
|
||||||
// one that's might still be currently running, and another one lined up
|
// one that's might still be currently running, and another one lined up
|
||||||
// no need for more than that
|
// no need for more than that
|
||||||
if (scheduledScanCount.incrementAndGet() < 2) {
|
if (scheduledScanCount.incrementAndGet() <= 2) {
|
||||||
executor.execute(this::actualScanAck);
|
executor.execute(this::actualScanAck);
|
||||||
} else {
|
} else {
|
||||||
scheduledScanCount.decrementAndGet();
|
scheduledScanCount.decrementAndGet();
|
||||||
|
@ -171,6 +175,32 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
scanList.clear();
|
scanList.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int retriedFound = 0;
|
||||||
|
for (int i = 0; i < localScanList.length; i++) {
|
||||||
|
PageScan scanElemen = localScanList[i];
|
||||||
|
if (scanElemen.retryBeforeScan != null && scanElemen.retryBeforeScan.getAsBoolean()) {
|
||||||
|
localScanList[i] = null;
|
||||||
|
retriedFound++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (retriedFound == localScanList.length) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isPaging()) {
|
||||||
|
// this would mean that between the submit and now the system left paging mode
|
||||||
|
// at this point we will just return everything as notFound
|
||||||
|
for (int i = 0; i < localScanList.length; i++) {
|
||||||
|
PageScan scanElemen = localScanList[i];
|
||||||
|
if (scanElemen != null && scanElemen.notFound != null) {
|
||||||
|
scanElemen.notFound.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
LinkedList<Runnable> afterCommitList = new LinkedList<>();
|
LinkedList<Runnable> afterCommitList = new LinkedList<>();
|
||||||
TransactionImpl tx = new TransactionImpl(store);
|
TransactionImpl tx = new TransactionImpl(store);
|
||||||
tx.addOperation(new TransactionOperationAbstract() {
|
tx.addOperation(new TransactionOperationAbstract() {
|
||||||
|
@ -196,7 +226,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int result = scanElemen.scanFunction.compareTo(reference);
|
int result = scanElemen.scanFunction.applyAsInt(reference);
|
||||||
|
|
||||||
if (result >= 0) {
|
if (result >= 0) {
|
||||||
if (result == 0) {
|
if (result == 0) {
|
||||||
|
@ -209,8 +239,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
logger.warn(e.getMessage(), e);
|
logger.warn(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (scanElemen.notfound != null) {
|
if (scanElemen.notFound != null) {
|
||||||
scanElemen.notfound.run();
|
scanElemen.notFound.run();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
localScanList[i] = null;
|
localScanList[i] = null;
|
||||||
|
@ -228,8 +258,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < localScanList.length; i++) {
|
for (int i = 0; i < localScanList.length; i++) {
|
||||||
if (localScanList[i] != null && localScanList[i].notfound != null) {
|
if (localScanList[i] != null && localScanList[i].notFound != null) {
|
||||||
localScanList[i].notfound.run();
|
localScanList[i].notFound.run();
|
||||||
}
|
}
|
||||||
localScanList[i] = null;
|
localScanList[i] = null;
|
||||||
}
|
}
|
||||||
|
@ -241,6 +271,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
logger.warn(e.getMessage(), e);
|
logger.warn(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
} finally {
|
} finally {
|
||||||
scheduledScanCount.decrementAndGet();
|
scheduledScanCount.decrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
|
@ -640,7 +640,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
|
||||||
server_2.startBrokerConnection(brokerConnectionName);
|
server_2.startBrokerConnection(brokerConnectionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
snfreplica = server_2.locateQueue(replica.getMirrorSNF());
|
Assert.assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF()));
|
||||||
|
|
||||||
if (pagingTarget) {
|
if (pagingTarget) {
|
||||||
assertTrue(queueOnServer1.getPagingStore().isPaging());
|
assertTrue(queueOnServer1.getPagingStore().isPaging());
|
||||||
|
@ -852,7 +852,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
|
||||||
int LAST_ID,
|
int LAST_ID,
|
||||||
int port,
|
int port,
|
||||||
boolean assertNull) throws JMSException {
|
boolean assertNull) throws JMSException {
|
||||||
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port + "?jms.prefetchPolicy.all=0");
|
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port);
|
||||||
Connection conn = cf.createConnection();
|
Connection conn = cf.createConnection();
|
||||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
conn.start();
|
conn.start();
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp.connect;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
|
@ -26,12 +27,24 @@ import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
|
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
|
||||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
|
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||||
import org.apache.activemq.artemis.tests.util.Wait;
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
@ -77,6 +90,7 @@ public class PagedMirrorTest extends ActiveMQTestBase {
|
||||||
String secondConsumeURI = "tcp://localhost:61617";
|
String secondConsumeURI = "tcp://localhost:61617";
|
||||||
|
|
||||||
Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
|
Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
|
||||||
|
Wait.waitFor(() -> server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
|
||||||
|
|
||||||
org.apache.activemq.artemis.core.server.Queue snf1 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
|
org.apache.activemq.artemis.core.server.Queue snf1 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
|
||||||
Assert.assertNotNull(snf1);
|
Assert.assertNotNull(snf1);
|
||||||
|
@ -152,6 +166,128 @@ public class PagedMirrorTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAckWithScan() throws Throwable {
|
||||||
|
String sendURI = "tcp://localhost:61616";
|
||||||
|
String consumeURI = "tcp://localhost:61617";
|
||||||
|
|
||||||
|
Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
|
||||||
|
Wait.waitFor(() -> server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
|
||||||
|
|
||||||
|
org.apache.activemq.artemis.core.server.Queue snf1 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
|
||||||
|
Assert.assertNotNull(snf1);
|
||||||
|
|
||||||
|
org.apache.activemq.artemis.core.server.Queue snf2 = server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
|
||||||
|
Assert.assertNotNull(snf2);
|
||||||
|
|
||||||
|
File countJournalLocation = server1.getConfiguration().getJournalLocation();
|
||||||
|
Assert.assertTrue(countJournalLocation.exists() && countJournalLocation.isDirectory());
|
||||||
|
String protocol = "amqp";
|
||||||
|
|
||||||
|
ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol, sendURI);
|
||||||
|
|
||||||
|
String bodyBuffer;
|
||||||
|
{
|
||||||
|
StringBuffer buffer = new StringBuffer();
|
||||||
|
for (int i = 0; i < 1024; i++) {
|
||||||
|
buffer.append("*");
|
||||||
|
}
|
||||||
|
bodyBuffer = buffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
int NUMBER_OF_MESSAGES = 200;
|
||||||
|
|
||||||
|
try (Connection sendConnecton = sendCF.createConnection()) {
|
||||||
|
Session sendSession = sendConnecton.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue jmsQueue = sendSession.createQueue("someQueue");
|
||||||
|
MessageProducer producer = sendSession.createProducer(jmsQueue);
|
||||||
|
|
||||||
|
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||||
|
TextMessage message = sendSession.createTextMessage(bodyBuffer);
|
||||||
|
message.setIntProperty("i", i);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
sendSession.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.assertEquals(0, snf1::getMessageCount);
|
||||||
|
Wait.assertEquals(0, snf2::getMessageCount);
|
||||||
|
|
||||||
|
org.apache.activemq.artemis.core.server.Queue serverQueue2 = server2.locateQueue("someQueue");
|
||||||
|
Assert.assertNotNull(serverQueue2);
|
||||||
|
org.apache.activemq.artemis.core.server.Queue serverQueue1 = server1.locateQueue("someQueue");
|
||||||
|
Assert.assertNotNull(serverQueue1);
|
||||||
|
|
||||||
|
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue2::getMessageCount);
|
||||||
|
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue1::getMessageCount);
|
||||||
|
|
||||||
|
|
||||||
|
ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol, consumeURI);
|
||||||
|
try (Connection connection = consumeCF.createConnection()) {
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue jmsQueue = session.createQueue("someQueue");
|
||||||
|
MessageConsumer consumer = session.createConsumer(jmsQueue);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Message recMessage = consumer.receive(5000);
|
||||||
|
Assert.assertNotNull(recMessage);
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.assertEquals(NUMBER_OF_MESSAGES - 10, serverQueue2::getMessageCount);
|
||||||
|
|
||||||
|
LinkedList<MessageReference> refs = new LinkedList<>();
|
||||||
|
serverQueue2.forEach((t) -> refs.add(t));
|
||||||
|
|
||||||
|
AMQPMirrorControllerTarget controllerTarget = locateMirrorTarget(server2);
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(refs.size());
|
||||||
|
|
||||||
|
IOCallback callback = new IOCallback() {
|
||||||
|
@Override
|
||||||
|
public void done() {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(int errorCode, String errorMessage) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for (MessageReference r : refs) {
|
||||||
|
Long messageID = (Long)r.getMessage().getBrokerProperty(SimpleString.toSimpleString("x-opt-amq-mr-id"));
|
||||||
|
Object nodeID = r.getMessage().getBrokerProperty(SimpleString.toSimpleString("x-opt-amq-bkr-id"));
|
||||||
|
|
||||||
|
// this will force the retry on the queue after a depage happened
|
||||||
|
controllerTarget.performAckOnPage(nodeID.toString(), messageID, serverQueue2, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
Wait.assertEquals(NUMBER_OF_MESSAGES - 10 - refs.size(), serverQueue2::getMessageCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static AMQPMirrorControllerTarget locateMirrorTarget(ActiveMQServer server) {
|
||||||
|
ActiveMQServerImpl theServer = (ActiveMQServerImpl) server;
|
||||||
|
|
||||||
|
for (RemotingConnection connection : theServer.getRemotingService().getConnections()) {
|
||||||
|
if (connection instanceof ActiveMQProtonRemotingConnection) {
|
||||||
|
ActiveMQProtonRemotingConnection protonRC = (ActiveMQProtonRemotingConnection) connection;
|
||||||
|
for (AMQPSessionContext sessionContext : protonRC.getAmqpConnection().getSessions().values()) {
|
||||||
|
for (ProtonAbstractReceiver receiver : sessionContext.getReceivers().values()) {
|
||||||
|
if (receiver instanceof AMQPMirrorControllerTarget) {
|
||||||
|
return (AMQPMirrorControllerTarget) receiver;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private int acksCount(File countJournalLocation) throws Exception {
|
private int acksCount(File countJournalLocation) throws Exception {
|
||||||
HashMap<Integer, AtomicInteger> countJournal = countJournal(countJournalLocation, 10485760, 2, 2);
|
HashMap<Integer, AtomicInteger> countJournal = countJournal(countJournalLocation, 10485760, 2, 2);
|
||||||
AtomicInteger acksCount = countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
|
AtomicInteger acksCount = countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
|
||||||
|
|
|
@ -24,6 +24,7 @@ import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.ToIntFunction;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
@ -38,6 +39,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
import org.apache.activemq.artemis.utils.Wait;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -120,24 +122,28 @@ public class PageAckScanTest extends ActiveMQTestBase {
|
||||||
session.commit();
|
session.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.forEach((r) -> System.out.println("ref -> " + r.getMessage().getIntProperty("i")));
|
|
||||||
|
|
||||||
AtomicInteger errors = new AtomicInteger(0);
|
AtomicInteger errors = new AtomicInteger(0);
|
||||||
|
|
||||||
ReusableLatch latch = new ReusableLatch(4);
|
ReusableLatch latch = new ReusableLatch(4);
|
||||||
|
|
||||||
Runnable done = latch::countDown;
|
Runnable done = latch::countDown;
|
||||||
|
|
||||||
Runnable notFound = () -> {
|
Runnable notFound = () -> {
|
||||||
errors.incrementAndGet();
|
errors.incrementAndGet();
|
||||||
done.run();
|
done.run();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
AtomicInteger retried = new AtomicInteger(0);
|
||||||
PageSubscription subscription = queue.getPageSubscription();
|
PageSubscription subscription = queue.getPageSubscription();
|
||||||
subscription.addScanAck(new CompareI(15), done, notFound);
|
subscription.addScanAck(() -> false, new CompareI(15), done, notFound);
|
||||||
subscription.addScanAck(new CompareI(11), done, notFound);
|
subscription.addScanAck(() -> false, new CompareI(11), done, notFound);
|
||||||
subscription.addScanAck(new CompareI(99), done, notFound);
|
subscription.addScanAck(() -> false, new CompareI(99), done, notFound);
|
||||||
subscription.addScanAck(new CompareI(-30), done, notFound);
|
subscription.addScanAck(() -> false, new CompareI(-30), done, notFound);
|
||||||
System.out.println("Performing scan...");
|
subscription.addScanAck(() -> true, new CompareI(333), retried::incrementAndGet, notFound);
|
||||||
subscription.performScanAck();
|
subscription.performScanAck();
|
||||||
Assert.assertTrue(latch.await(5, TimeUnit.MINUTES));
|
Assert.assertTrue(latch.await(5, TimeUnit.MINUTES));
|
||||||
Assert.assertEquals(2, errors.get());
|
Assert.assertEquals(2, errors.get());
|
||||||
|
Wait.assertEquals(1, retried::get);
|
||||||
|
|
||||||
try (Connection connection = factory.createConnection()) {
|
try (Connection connection = factory.createConnection()) {
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
@ -153,17 +159,14 @@ public class PageAckScanTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Errorprone would barf at this. it was really intended
|
class CompareI implements ToIntFunction<PagedReference> {
|
||||||
@SuppressWarnings("ComparableType")
|
|
||||||
class CompareI implements Comparable<PagedReference> {
|
|
||||||
final int i;
|
final int i;
|
||||||
CompareI(int i) {
|
CompareI(int i) {
|
||||||
this.i = i;
|
this.i = i;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(PagedReference ref) {
|
public int applyAsInt(PagedReference ref) {
|
||||||
System.out.println("Comparing against " + ref.getMessage().getIntProperty("i"));
|
|
||||||
return ref.getMessage().getIntProperty("i").intValue() - i;
|
return ref.getMessage().getIntProperty("i").intValue() - i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue