ARTEMIS-3464 Missing ACKs on Page and Mirror

This commit is contained in:
Clebert Suconic 2021-08-30 17:09:17 -04:00
parent 20c1836fa2
commit 7fb4f80649
17 changed files with 1536 additions and 21 deletions

View File

@ -393,6 +393,10 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
mirrorControlQueue = server.createQueue(new QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true), true);
}
if (logger.isDebugEnabled()) {
logger.debug("Mirror queue " + mirrorControlQueue.getName());
}
mirrorControlQueue.setMirrorController(true);
QueueBinding snfReplicaQueueBinding = (QueueBinding)server.getPostOffice().getBinding(getMirrorSNF(replicaConfig));

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -45,7 +46,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@ -95,7 +95,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
public TransactionOperationAbstract tx = new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
completeOperation();
connectionRun();
}
};
@ -121,10 +121,10 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
@Override
public void done() {
completeOperation();
connectionRun();
}
private void completeOperation() {
public void connectionRun() {
connection.runNow(ACKMessageOperation.this);
}
@ -146,7 +146,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
DuplicateIDCache lruduplicateIDCache;
String lruDuplicateIDKey;
private final NodeStore<MessageReference> referenceNodeStore;
private final ReferenceNodeStore referenceNodeStore;
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
@ -354,6 +354,20 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
private void performAckOnPage(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation) {
if (targetQueue.getPagingStore().isPaging()) {
PageAck pageAck = new PageAck(nodeID, messageID, ackMessageOperation);
targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, pageAck);
targetQueue.getPageSubscription().performScanAck();
} else {
if (logger.isDebugEnabled()) {
logger.debug("Post ack Server " + server + " could not find messageID = " + messageID +
" representing nodeID=" + nodeID);
}
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
}
}
private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + messageID + ")" + ", targetQueue=" + targetQueue.getName());
@ -380,10 +394,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
logger.warn(e.getMessage(), e);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Post ack Server " + server + " could not find messageID = " + messageID +
" representing nodeID=" + nodeID);
}
performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
}
}
@ -476,4 +487,48 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
}
// I need a supress warning annotation here
// because errorProne is issuing an error her, however I really intend to compare PageACK against PagedReference
@SuppressWarnings("ComparableType")
class PageAck implements Comparable<PagedReference>, Runnable {
final String nodeID;
final long messageID;
final ACKMessageOperation operation;
PageAck(String nodeID, long messageID, ACKMessageOperation operation) {
this.nodeID = nodeID;
this.messageID = messageID;
this.operation = operation;
}
@Override
public int compareTo(PagedReference reference) {
String refNodeID = referenceNodeStore.getServerID(reference);
long refMessageID = referenceNodeStore.getID(reference);
if (refNodeID == null) {
refNodeID = referenceNodeStore.getDefaultNodeID();
}
if (refNodeID.equals(nodeID)) {
long diff = refMessageID - messageID;
if (diff == 0) {
return 0;
} else if (diff > 0) {
return 1;
} else {
return -1;
}
} else {
return -1;
}
}
@Override
public void run() {
operation.connectionRun();
}
}
}

View File

@ -42,6 +42,11 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
String lruListID;
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> lruMap;
public String getDefaultNodeID() {
return serverID;
}
@Override
public void storeNode(MessageReference element, LinkedListImpl.Node<MessageReference> node) {
String list = getServerID(element);

View File

@ -85,9 +85,17 @@ public interface PageSubscription {
// for internal (cursor) classes
void confirmPosition(Transaction tx, PagePosition position) throws Exception;
/**
* @return the first page in use or MAX_LONG if none is in use
*/
// Add a scan function to be performed. It will be completed when you call performScan
void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound);
// it will schedule a scan on pages for everything that was added through addScanAck
void performScanAck();
/**
* @return the first page in use or MAX_LONG if none is in use
*/
long getFirstPage();
// Reload operations

View File

@ -112,6 +112,139 @@ public final class PageSubscriptionImpl implements PageSubscription {
// Each CursorIterator will record their current PageReader in this map
private final ConcurrentLongHashMap<PageReader> pageReaders = new ConcurrentLongHashMap<>();
private final AtomicInteger scheduledScanCount = new AtomicInteger(0);
private final LinkedList<PageScan> scanList = new LinkedList();
private static class PageScan {
final Comparable<PagedReference> scanFunction;
final Runnable found;
final Runnable notfound;
public Comparable<PagedReference> getScanFunction() {
return scanFunction;
}
public Runnable getFound() {
return found;
}
public Runnable getNotfound() {
return notfound;
}
PageScan(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
this.scanFunction = scanFunction;
this.found = found;
this.notfound = notfound;
}
}
@Override
public void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
PageScan scan = new PageScan(scanFunction, found, notfound);
synchronized (scanList) {
scanList.add(scan);
}
}
@Override
public void performScanAck() {
// we should only have a max of 2 scheduled tasks
// one that's might still be currently running, and another one lined up
// no need for more than that
if (scheduledScanCount.incrementAndGet() < 2) {
executor.execute(this::actualScanAck);
} else {
scheduledScanCount.decrementAndGet();
}
}
private void actualScanAck() {
try {
PageScan[] localScanList;
synchronized (scanList) {
if (scanList.size() == 0) {
return;
}
localScanList = scanList.stream().toArray(i -> new PageScan[i]);
scanList.clear();
}
LinkedList<Runnable> afterCommitList = new LinkedList<>();
TransactionImpl tx = new TransactionImpl(store);
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
for (Runnable r : afterCommitList) {
try {
r.run();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
});
PageIterator iterator = this.iterator(true);
try {
while (iterator.hasNext()) {
PagedReference reference = iterator.next();
boolean keepMoving = false;
for (int i = 0; i < localScanList.length; i++) {
PageScan scanElemen = localScanList[i];
if (scanElemen == null) {
continue;
}
int result = scanElemen.scanFunction.compareTo(reference);
if (result >= 0) {
if (result == 0) {
try {
PageSubscriptionImpl.this.ackTx(tx, reference);
if (scanElemen.found != null) {
afterCommitList.add(scanElemen.found);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} else {
if (scanElemen.notfound != null) {
scanElemen.notfound.run();
}
}
localScanList[i] = null;
} else {
keepMoving = true;
}
}
if (!keepMoving) {
break;
}
}
} finally {
iterator.close();
}
for (int i = 0; i < localScanList.length; i++) {
if (localScanList[i] != null && localScanList[i].notfound != null) {
localScanList[i].notfound.run();
}
localScanList[i] = null;
}
if (afterCommitList.size() > 0) {
try {
tx.commit();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
} finally {
scheduledScanCount.decrementAndGet();
}
}
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,

View File

@ -26,6 +26,15 @@ import org.apache.activemq.artemis.core.server.MessageReference;
*/
public abstract class TransactionOperationAbstract implements TransactionOperation {
public static TransactionOperationAbstract afterCommit(Runnable run) {
return new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
run.run();
}
};
}
@Override
public void beforePrepare(Transaction tx) throws Exception {

View File

@ -1767,10 +1767,15 @@ public abstract class ActiveMQTestBase extends Assert {
* @throws Exception
*/
protected HashMap<Integer, AtomicInteger> countJournal(Configuration config) throws Exception {
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<>();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getJournalLocation(), null, 1);
File location = config.getJournalLocation();
return countJournal(location, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles());
}
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
protected HashMap<Integer, AtomicInteger> countJournal(File location, int journalFileSize, int minFiles, int poolfiles) throws Exception {
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<>();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(location, null, 1);
JournalImpl messagesJournal = new JournalImpl(journalFileSize, minFiles, poolfiles, 0, 0, messagesFF, "activemq-data", "amq", 1);
List<JournalFile> filesToRead = messagesJournal.orderFiles();
for (JournalFile file : filesToRead) {

View File

@ -25,9 +25,9 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -640,6 +640,8 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
server_2.startBrokerConnection(brokerConnectionName);
}
snfreplica = server_2.locateQueue(replica.getMirrorSNF());
if (pagingTarget) {
assertTrue(queueOnServer1.getPagingStore().isPaging());
}
@ -647,10 +649,13 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
if (acks) {
consumeMessages(largeMessage, 0, NUMBER_OF_MESSAGES / 2 - 1, AMQP_PORT_2, false);
// Replica is async, so we need to wait acks to arrive before we finish consuming there
Wait.assertEquals(0, snfreplica::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES / 2, queueOnServer1::getMessageCount);
// we consume on replica, as half the messages were acked
consumeMessages(largeMessage, NUMBER_OF_MESSAGES / 2, NUMBER_OF_MESSAGES - 1, AMQP_PORT, true); // We consume on both servers as this is currently replicated
Wait.assertEquals(0, snfreplica::getMessageCount);
consumeMessages(largeMessage, NUMBER_OF_MESSAGES / 2, NUMBER_OF_MESSAGES - 1, AMQP_PORT_2, false);
Wait.assertEquals(0, snfreplica::getMessageCount);
if (largeMessage) {
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), 0);
@ -658,7 +663,9 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
} else {
consumeMessages(largeMessage, 0, NUMBER_OF_MESSAGES - 1, AMQP_PORT_2, true);
Wait.assertEquals(0, snfreplica::getMessageCount);
consumeMessages(largeMessage, 0, NUMBER_OF_MESSAGES - 1, AMQP_PORT, true);
Wait.assertEquals(0, snfreplica::getMessageCount);
if (largeMessage) {
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), 0);
validateNoFilesOnLargeDir(server_2.getConfiguration().getLargeMessagesDirectory(), 0); // we kept half of the messages
@ -845,23 +852,31 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
int LAST_ID,
int port,
boolean assertNull) throws JMSException {
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port);
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port + "?jms.prefetchPolicy.all=0");
Connection conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
HashSet<Integer> idsReceived = new HashSet<>();
MessageConsumer consumer = sess.createConsumer(sess.createQueue(getQueueName()));
for (int i = START_ID; i <= LAST_ID; i++) {
Message message = consumer.receive(3000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("i"));
if (message instanceof TextMessage) {
Assert.assertEquals(getText(largeMessage, i), ((TextMessage) message).getText());
}
Integer id = message.getIntProperty("i");
Assert.assertNotNull(id);
Assert.assertTrue(idsReceived.add(id));
}
if (assertNull) {
Assert.assertNull(consumer.receiveNoWait());
}
for (int i = START_ID; i <= LAST_ID; i++) {
Assert.assertTrue(idsReceived.remove(i));
}
Assert.assertTrue(idsReceived.isEmpty());
conn.close();
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PagedMirrorTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(PagedMirrorTest.class);
ActiveMQServer server1;
ActiveMQServer server2;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
server1 = createServer(true, createDefaultConfig(0, true), 1024, 10 * 1024);
server1.getConfiguration().getAcceptorConfigurations().clear();
server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616");
AMQPBrokerConnectConfiguration brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
server1.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
server2 = createServer(true, createDefaultConfig(1, true), 1024, 10 * 1024);
server2.getConfiguration().getAcceptorConfigurations().clear();
server2.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61617");
brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(1000);
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
server2.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
server1.start();
server2.start();
}
@Test
public void testPaged() throws Throwable {
String sendURI = "tcp://localhost:61616";
String consumeURI = "tcp://localhost:61616";
String secondConsumeURI = "tcp://localhost:61617";
Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") != null);
org.apache.activemq.artemis.core.server.Queue snf1 = server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf1);
org.apache.activemq.artemis.core.server.Queue snf2 = server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
Assert.assertNotNull(snf2);
File countJournalLocation = server1.getConfiguration().getJournalLocation();
Assert.assertTrue(countJournalLocation.exists() && countJournalLocation.isDirectory());
String protocol = "amqp";
ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol, sendURI);
ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol, consumeURI);
ConnectionFactory secondConsumeCF = CFUtil.createConnectionFactory(protocol, secondConsumeURI);
String bodyBuffer;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < 1024; i++) {
buffer.append("*");
}
bodyBuffer = buffer.toString();
}
int NUMBER_OF_MESSAGES = 200;
int ACK_I = 77;
try (Connection sendConnecton = sendCF.createConnection()) {
Session sendSession = sendConnecton.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = sendSession.createQueue("someQueue");
MessageProducer producer = sendSession.createProducer(jmsQueue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = sendSession.createTextMessage(bodyBuffer);
message.setIntProperty("i", i);
producer.send(message);
}
sendSession.commit();
}
Wait.assertEquals(0, snf1::getMessageCount);
Wait.assertEquals(0, snf2::getMessageCount);
try (Connection consumeConnection = consumeCF.createConnection()) {
Session consumeSession = consumeConnection.createSession(false, 101); // individual ack
Queue jmsQueue = consumeSession.createQueue("someQueue");
MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
consumeConnection.start();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(6000);
if (message.getIntProperty("i") == ACK_I) {
message.acknowledge();
}
}
Assert.assertNull(consumer.receiveNoWait());
}
Wait.assertEquals(0, snf1::getMessageCount);
Wait.assertEquals(0, snf2::getMessageCount);
Wait.assertEquals(1, () -> acksCount(countJournalLocation), 5000, 1000);
try (Connection consumeConnection = secondConsumeCF.createConnection()) {
Session consumeSession = consumeConnection.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = consumeSession.createQueue("someQueue");
MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
consumeConnection.start();
for (int i = 0; i < NUMBER_OF_MESSAGES - 1; i++) {
TextMessage message = (TextMessage) consumer.receive(6000);
Assert.assertNotNull(message);
Assert.assertNotEquals(ACK_I, message.getIntProperty("i"));
}
Assert.assertNull(consumer.receiveNoWait());
}
}
private int acksCount(File countJournalLocation) throws Exception {
HashMap<Integer, AtomicInteger> countJournal = countJournal(countJournalLocation, 10485760, 2, 2);
AtomicInteger acksCount = countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
return acksCount != null ? acksCount.get() : 0;
}
}

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class IndividualAckPagingTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(IndividualAckPagingTest.class);
// Even though the focus of the test is paging, I'm adding non paging here to verify the test semantics itself
@Parameterized.Parameters(name = "paging={0}, restartServerBeforeConsume={1}")
public static Collection getParams() {
return Arrays.asList(new Object[][]{{true, false}, {true, true}, {false, false}});
}
protected final boolean paging;
protected final boolean restartServerBeforeConsume;
private static final String ADDRESS = "IndividualAckPagingTest";
ActiveMQServer server;
protected static final int PAGE_MAX = 10 * 1024;
protected static final int PAGE_SIZE = 5 * 1024;
public IndividualAckPagingTest(boolean paging, boolean restartServerBeforeConsume) {
this.paging = paging;
this.restartServerBeforeConsume = restartServerBeforeConsume;
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
Configuration config = createDefaultConfig(0, true).setJournalSyncNonTransactional(false);
config.setMessageExpiryScanPeriod(-1);
if (paging) {
server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.getAddressSettingsRepository().clear();
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
} else {
server = createServer(true, config, 10 * 1024 * 1024, -1);
server.getAddressSettingsRepository().clear();
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024 * 1024).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
}
server.start();
server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
}
@Test
public void testIndividualAckCore() throws Exception {
testIndividualAck("CORE", 1024);
}
@Test
public void testIndividualAckAMQP() throws Exception {
testIndividualAck("AMQP", 1024);
}
public void testIndividualAck(String protocol, int bodySize) throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
String extraBody;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < bodySize; i++) {
buffer.append("*");
}
extraBody = buffer.toString();
}
Queue queue = server.locateQueue(ADDRESS);
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
MessageProducer producer = session.createProducer(jmsQueue);
for (int i = 0; i < 100; i++) {
TextMessage message = session.createTextMessage(extraBody);
message.setIntProperty("i", i);
producer.send(message);
}
session.commit();
}
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, 101); // INDIVIDUAL-ACK.. same constant for AMQP and CORE
javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
connection.start();
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 100; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
if (message.getIntProperty("i") == 77) {
message.acknowledge();
}
}
Assert.assertNull(consumer.receiveNoWait());
}
if (restartServerBeforeConsume) {
server.stop();
server.start();
}
try (Connection connection = factory.createConnection()) {
Session session;
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
connection.start();
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 99; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertNotEquals(77, message.getIntProperty("i"));
}
Assert.assertNull(consumer.receiveNoWait());
}
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PageAckScanTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(PageAckScanTest.class);
private static final String ADDRESS = "MessagesExpiredPagingTest";
ActiveMQServer server;
protected static final int PAGE_MAX = 10 * 1024;
protected static final int PAGE_SIZE = 1 * 1024;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
Configuration config = createDefaultConfig(0, true).setJournalSyncNonTransactional(false);
config.setMessageExpiryScanPeriod(-1);
server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.getAddressSettingsRepository().clear();
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.start();
server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
}
@Test
public void testScanCore() throws Exception {
testScan("CORE", 5000, 1000, 100, 1024);
}
@Test
public void testScanAMQP() throws Exception {
testScan("AMQP", 5000, 1000, 100, 1024);
}
public void testScan(String protocol, int numberOfMessages, int numberOfMessageSecondWave, int pagingInterval, int bodySize) throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
String extraBody;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < bodySize; i++) {
buffer.append("*");
}
extraBody = buffer.toString();
}
Queue queue = server.locateQueue(ADDRESS);
queue.getPagingStore().startPaging();
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
MessageProducer producer = session.createProducer(jmsQueue);
for (int i = 0; i < 20; i++) {
TextMessage message = session.createTextMessage(extraBody);
message.setIntProperty("i", i);
producer.send(message);
}
session.commit();
}
queue.forEach((r) -> System.out.println("ref -> " + r.getMessage().getIntProperty("i")));
AtomicInteger errors = new AtomicInteger(0);
ReusableLatch latch = new ReusableLatch(4);
Runnable done = latch::countDown;
Runnable notFound = () -> {
errors.incrementAndGet();
done.run();
};
PageSubscription subscription = queue.getPageSubscription();
subscription.addScanAck(new CompareI(15), done, notFound);
subscription.addScanAck(new CompareI(11), done, notFound);
subscription.addScanAck(new CompareI(99), done, notFound);
subscription.addScanAck(new CompareI(-30), done, notFound);
System.out.println("Performing scan...");
subscription.performScanAck();
Assert.assertTrue(latch.await(5, TimeUnit.MINUTES));
Assert.assertEquals(2, errors.get());
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
connection.start();
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 18; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertTrue(message.getIntProperty("i") != 11 && message.getIntProperty("i") != 15);
}
Assert.assertNull(consumer.receiveNoWait());
}
}
// Errorprone would barf at this. it was really intended
@SuppressWarnings("ComparableType")
class CompareI implements Comparable<PagedReference> {
final int i;
CompareI(int i) {
this.i = i;
}
@Override
public int compareTo(PagedReference ref) {
System.out.println("Comparing against " + ref.getMessage().getIntProperty("i"));
return ref.getMessage().getIntProperty("i").intValue() - i;
}
}
}

View File

@ -1015,6 +1015,38 @@
<configuration>${basedir}/target/classes/servers/brokerConnect/serverB</configuration>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-brokerConnection-paged-serverA</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>artemis</user>
<password>artemis</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>true</noWeb>
<instance>${basedir}/target/brokerConnect/pagedA</instance>
<configuration>${basedir}/target/classes/servers/brokerConnect/pagedA</configuration>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-brokerConnection-paged-serverB</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>artemis</user>
<password>artemis</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>true</noWeb>
<instance>${basedir}/target/brokerConnect/pagedB</instance>
<configuration>${basedir}/target/classes/servers/brokerConnect/pagedB</configuration>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-qdr</id>

View File

@ -0,0 +1,215 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>ServerA</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<!--
When using ASYNCIO, this will determine the writing queue depth for libaio.
-->
<journal-max-io>1</journal-max-io>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>40000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
</acceptors>
<broker-connections>
<amqp-connection uri="tcp://localhost:61617" name="outgoing" reconnect-attempts="-1" retry-interval="100" user="B" password="B">
<mirror durable="true"/>
</amqp-connection>
</broker-connections>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>100000</max-size-bytes>
<page-size-bytes>10000</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
<address name="someQueue">
<anycast>
<queue name="someQueue" />
</anycast>
</address>
</addresses>
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="true"/>
<property key="LOG_CONNECTION_EVENTS" value="true"/>
<property key="LOG_SESSION_EVENTS" value="true"/>
<property key="LOG_CONSUMER_EVENTS" value="true"/>
<property key="LOG_DELIVERING_EVENTS" value="true"/>
<property key="LOG_SENDING_EVENTS" value="true"/>
<property key="LOG_INTERNAL_EVENTS" value="true"/>
</broker-plugin>
</broker-plugins>
-->
</core>
</configuration>

View File

@ -0,0 +1,86 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Additional logger names to configure (root logger is always configured)
# Root logger option
loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource,org.apache.activemq.artemis.utils.pools,org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget,org.apache.activemq.artemis.core.server.impl,org.apache.activemq.artemis.utils.collections,org.apache.activemq.artemis.core.postoffice.impl
# Special logger to debug mirror Security
# Root logger level
logger.level=INFO
# ActiveMQ Artemis logger levels
# These levels are candidates to eventually debug
logger.org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.level=INFO
# if you have issues with CriticalAnalyzer, setting this as TRACE would give you extra troubleshooting information.
# but do not use it regularly as it would incur in some extra CPU usage for this diagnostic.
logger.org.apache.activemq.artemis.utils.critical.level=INFO
logger.org.eclipse.jetty.level=WARN
# Root logger handlers
logger.handlers=FILE,CONSOLE
# to enable audit change the level to INFO
logger.org.apache.activemq.audit.base.level=ERROR
logger.org.apache.activemq.audit.base.handlers=AUDIT_FILE
logger.org.apache.activemq.audit.base.useParentHandlers=false
logger.org.apache.activemq.audit.resource.level=ERROR
logger.org.apache.activemq.audit.resource.handlers=AUDIT_FILE
logger.org.apache.activemq.audit.resource.useParentHandlers=false
logger.org.apache.activemq.audit.message.level=ERROR
logger.org.apache.activemq.audit.message.handlers=AUDIT_FILE
logger.org.apache.activemq.audit.message.useParentHandlers=false
# Console handler configuration
handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
handler.CONSOLE.properties=autoFlush
handler.CONSOLE.level=TRACE
handler.CONSOLE.autoFlush=true
handler.CONSOLE.formatter=PATTERN
# File handler configuration
handler.FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
handler.FILE.level=TRACE
handler.FILE.properties=suffix,append,autoFlush,fileName
handler.FILE.suffix=.yyyy-MM-dd
handler.FILE.append=true
handler.FILE.autoFlush=true
handler.FILE.fileName=${artemis.instance}/log/artemis.log
handler.FILE.formatter=PATTERN
# Formatter pattern configuration
formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
formatter.PATTERN.properties=pattern
formatter.PATTERN.pattern=[%t] %d %-5p [%c] %s%E%n
#Audit logger
handler.AUDIT_FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
handler.AUDIT_FILE.level=INFO
handler.AUDIT_FILE.properties=suffix,append,autoFlush,fileName
handler.AUDIT_FILE.suffix=.yyyy-MM-dd
handler.AUDIT_FILE.append=true
handler.AUDIT_FILE.autoFlush=true
handler.AUDIT_FILE.fileName=${artemis.instance}/log/audit.log
handler.AUDIT_FILE.formatter=AUDIT_PATTERN
formatter.AUDIT_PATTERN=org.jboss.logmanager.formatters.PatternFormatter
formatter.AUDIT_PATTERN.properties=pattern
formatter.AUDIT_PATTERN.pattern=%d [AUDIT](%t) %s%E%n

View File

@ -0,0 +1,215 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>ServerB</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<!--
When using ASYNCIO, this will determine the writing queue depth for libaio.
-->
<journal-max-io>1</journal-max-io>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>40000</page-sync-timeout>
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
</acceptors>
<broker-connections>
<amqp-connection uri="tcp://localhost:61616" name="serverA" user="artemis" password="artemis" retry-interval="100" reconnect-attempts="-1">
<mirror durable="true"/>
</amqp-connection>
</broker-connections>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>100000</max-size-bytes>
<page-size-bytes>10000</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
<address name="someQueue">
<anycast>
<queue name="someQueue" />
</anycast>
</address>
</addresses>
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="true"/>
<property key="LOG_CONNECTION_EVENTS" value="true"/>
<property key="LOG_SESSION_EVENTS" value="true"/>
<property key="LOG_CONSUMER_EVENTS" value="true"/>
<property key="LOG_DELIVERING_EVENTS" value="true"/>
<property key="LOG_SENDING_EVENTS" value="true"/>
<property key="LOG_INTERNAL_EVENTS" value="true"/>
</broker-plugin>
</broker-plugins>
-->
</core>
</configuration>

View File

@ -0,0 +1,86 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Additional logger names to configure (root logger is always configured)
# Root logger option
loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource,org.apache.activemq.artemis.utils.pools,org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget,org.apache.activemq.artemis.core.server.impl,org.apache.activemq.artemis.utils.collections,org.apache.activemq.artemis.core.postoffice.impl
# Special logger to debug mirror Security
# Root logger level
logger.level=INFO
# ActiveMQ Artemis logger levels
# These levels are candidates to eventually debug
logger.org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.level=INFO
# if you have issues with CriticalAnalyzer, setting this as TRACE would give you extra troubleshooting information.
# but do not use it regularly as it would incur in some extra CPU usage for this diagnostic.
logger.org.apache.activemq.artemis.utils.critical.level=INFO
logger.org.eclipse.jetty.level=WARN
# Root logger handlers
logger.handlers=FILE,CONSOLE
# to enable audit change the level to INFO
logger.org.apache.activemq.audit.base.level=ERROR
logger.org.apache.activemq.audit.base.handlers=AUDIT_FILE
logger.org.apache.activemq.audit.base.useParentHandlers=false
logger.org.apache.activemq.audit.resource.level=ERROR
logger.org.apache.activemq.audit.resource.handlers=AUDIT_FILE
logger.org.apache.activemq.audit.resource.useParentHandlers=false
logger.org.apache.activemq.audit.message.level=ERROR
logger.org.apache.activemq.audit.message.handlers=AUDIT_FILE
logger.org.apache.activemq.audit.message.useParentHandlers=false
# Console handler configuration
handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
handler.CONSOLE.properties=autoFlush
handler.CONSOLE.level=TRACE
handler.CONSOLE.autoFlush=true
handler.CONSOLE.formatter=PATTERN
# File handler configuration
handler.FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
handler.FILE.level=TRACE
handler.FILE.properties=suffix,append,autoFlush,fileName
handler.FILE.suffix=.yyyy-MM-dd
handler.FILE.append=true
handler.FILE.autoFlush=true
handler.FILE.fileName=${artemis.instance}/log/artemis.log
handler.FILE.formatter=PATTERN
# Formatter pattern configuration
formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
formatter.PATTERN.properties=pattern
formatter.PATTERN.pattern=[%t] %d %-5p [%c] %s%E%n
#Audit logger
handler.AUDIT_FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
handler.AUDIT_FILE.level=INFO
handler.AUDIT_FILE.properties=suffix,append,autoFlush,fileName
handler.AUDIT_FILE.suffix=.yyyy-MM-dd
handler.AUDIT_FILE.append=true
handler.AUDIT_FILE.autoFlush=true
handler.AUDIT_FILE.fileName=${artemis.instance}/log/audit.log
handler.AUDIT_FILE.formatter=AUDIT_PATTERN
formatter.AUDIT_PATTERN=org.jboss.logmanager.formatters.PatternFormatter
formatter.AUDIT_PATTERN.properties=pattern
formatter.AUDIT_PATTERN.pattern=%d [AUDIT](%t) %s%E%n

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.util.ServerUtil;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PagedMirrorSmokeTest extends SmokeTestBase {
// Change this to true to generate a print-data in certain cases on this test
private static final boolean PRINT_DATA = false;
private static final Logger logger = Logger.getLogger(PagedMirrorSmokeTest.class);
public static final String SERVER_NAME_A = "brokerConnect/pagedA";
public static final String SERVER_NAME_B = "brokerConnect/pagedB";
Process processB;
Process processA;
@Before
public void beforeClass() throws Exception {
cleanupData(SERVER_NAME_A);
cleanupData(SERVER_NAME_B);
processB = startServer(SERVER_NAME_B, 1, 0);
processA = startServer(SERVER_NAME_A, 0, 0);
ServerUtil.waitForServerToStart(1, "B", "B", 30000);
ServerUtil.waitForServerToStart(0, "A", "A", 30000);
}
@Test
public void testPaged() throws Throwable {
String sendURI = "tcp://localhost:61616";
String consumeURI = "tcp://localhost:61616";
String secondConsumeURI = "tcp://localhost:61617";
File countJournalLocation = new File(getServerLocation(SERVER_NAME_A), "data/journal");
File countJournalLocationB = new File(getServerLocation(SERVER_NAME_B), "data/journal");
Assert.assertTrue(countJournalLocation.exists() && countJournalLocation.isDirectory());
String protocol = "amqp";
ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol, sendURI);
ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol, consumeURI);
ConnectionFactory secondConsumeCF = CFUtil.createConnectionFactory(protocol, secondConsumeURI);
String bodyBuffer;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < 1024; i++) {
buffer.append("*");
}
bodyBuffer = buffer.toString();
}
int NUMBER_OF_MESSAGES = 200;
int ACK_I = 77;
try (Connection sendConnecton = sendCF.createConnection()) {
Session sendSession = sendConnecton.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = sendSession.createQueue("someQueue");
MessageProducer producer = sendSession.createProducer(jmsQueue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = sendSession.createTextMessage(bodyBuffer);
message.setIntProperty("i", i);
producer.send(message);
}
sendSession.commit();
}
Thread.sleep(500);
try (Connection consumeConnection = consumeCF.createConnection()) {
Session consumeSession = consumeConnection.createSession(false, 101); // individual ack
Queue jmsQueue = consumeSession.createQueue("someQueue");
MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
consumeConnection.start();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(6000);
if (message.getIntProperty("i") == ACK_I) {
message.acknowledge();
}
}
Assert.assertNull(consumer.receiveNoWait());
}
Wait.assertEquals(1, () -> acksCount(countJournalLocation), 5000, 1000);
Wait.assertEquals(1, () -> acksCount(countJournalLocationB), 5000, 1000);
try (Connection consumeConnection = secondConsumeCF.createConnection()) {
Session consumeSession = consumeConnection.createSession(true, Session.SESSION_TRANSACTED);
Queue jmsQueue = consumeSession.createQueue("someQueue");
MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
consumeConnection.start();
for (int i = 0; i < NUMBER_OF_MESSAGES - 1; i++) {
TextMessage message = (TextMessage) consumer.receive(6000);
Assert.assertNotNull(message);
Assert.assertNotEquals(ACK_I, message.getIntProperty("i"));
}
Assert.assertNull(consumer.receiveNoWait());
}
}
private int acksCount(File countJournalLocation) throws Exception {
HashMap<Integer, AtomicInteger> countJournal = countJournal(countJournalLocation, 10485760, 2, 2);
AtomicInteger acksCount = countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
return acksCount != null ? acksCount.get() : 0;
}
}