ARTEMIS-4798 Use original message to calculate the Mirror SNF size in order to start paging
This commit is contained in:
parent
fad1f5274c
commit
5be983392c
|
@ -145,8 +145,11 @@ public class SizeAwareMetric {
|
|||
return addSize(delta, false);
|
||||
}
|
||||
|
||||
public final long addSize(final int delta, final boolean sizeOnly) {
|
||||
public final long addSize(final int delta, boolean sizeOnly) {
|
||||
return addSize(delta, sizeOnly, true);
|
||||
}
|
||||
|
||||
public final long addSize(final int delta, final boolean sizeOnly, boolean affectCallbacks) {
|
||||
if (delta == 0) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("SizeAwareMetric ignored with size 0", new Exception("trace"));
|
||||
|
@ -156,7 +159,7 @@ public class SizeAwareMetric {
|
|||
|
||||
changeFlag(NOT_USED, FREE);
|
||||
|
||||
if (onSizeCallback != null) {
|
||||
if (onSizeCallback != null && affectCallbacks) {
|
||||
try {
|
||||
onSizeCallback.add(delta, sizeOnly);
|
||||
} catch (Throwable e) {
|
||||
|
|
|
@ -809,6 +809,12 @@ public interface Message {
|
|||
|
||||
int getMemoryEstimate();
|
||||
|
||||
/** The first estimate that's been calculated without any updates. */
|
||||
default int getOriginalEstimate() {
|
||||
// For Core Protocol we always use the same estimate
|
||||
return getMemoryEstimate();
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the size of the message when persisted on disk which is used for metrics tracking
|
||||
* Note that even if the message itself is not persisted on disk (ie non-durable) this value is
|
||||
|
|
|
@ -572,6 +572,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
|||
public int getMemoryEstimate() {
|
||||
if (memoryEstimate == -1) {
|
||||
memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0);
|
||||
originalEstimate = memoryEstimate;
|
||||
}
|
||||
return memoryEstimate;
|
||||
}
|
||||
|
|
|
@ -204,6 +204,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
protected long messageID;
|
||||
protected SimpleString address;
|
||||
protected volatile int memoryEstimate = -1;
|
||||
protected volatile int originalEstimate = -1;
|
||||
protected long expiration;
|
||||
protected boolean expirationReload = false;
|
||||
protected long scheduledTime = -1;
|
||||
|
@ -674,6 +675,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
}
|
||||
encodedHeaderSize = 0;
|
||||
memoryEstimate = -1;
|
||||
originalEstimate = -1;
|
||||
scheduledTime = -1;
|
||||
encodedDeliveryAnnotationsSize = 0;
|
||||
headerPosition = VALUE_NOT_PRESENT;
|
||||
|
@ -869,6 +871,16 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
@Override
|
||||
public abstract int getMemoryEstimate();
|
||||
|
||||
@Override
|
||||
public int getOriginalEstimate() {
|
||||
if (originalEstimate < 0) {
|
||||
// getMemoryEstimate should initialize originalEstimate
|
||||
return getMemoryEstimate();
|
||||
} else {
|
||||
return originalEstimate;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> toPropertyMap(int valueSizeLimit) {
|
||||
return toPropertyMap(false, valueSizeLimit);
|
||||
|
|
|
@ -189,6 +189,7 @@ public class AMQPStandardMessage extends AMQPMessage {
|
|||
public int getMemoryEstimate() {
|
||||
if (memoryEstimate == -1) {
|
||||
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
|
||||
originalEstimate = memoryEstimate;
|
||||
}
|
||||
|
||||
return memoryEstimate;
|
||||
|
|
|
@ -93,6 +93,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
|
|||
|
||||
long getAddressSize();
|
||||
|
||||
long getAddressElements();
|
||||
|
||||
long getMaxSize();
|
||||
|
||||
int getMaxPageReadBytes();
|
||||
|
@ -179,10 +181,14 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
|
|||
* @param size
|
||||
* @param sizeOnly if false we won't increment the number of messages. (add references for example)
|
||||
*/
|
||||
void addSize(int size, boolean sizeOnly);
|
||||
void addSize(int size, boolean sizeOnly, boolean affectGlobal);
|
||||
|
||||
default void addSize(int size, boolean sizeOnly) {
|
||||
addSize(size, sizeOnly, true);
|
||||
}
|
||||
|
||||
default void addSize(int size) {
|
||||
addSize(size, false);
|
||||
addSize(size, false, true);
|
||||
}
|
||||
|
||||
boolean checkMemory(Runnable runnable, Consumer<AtomicRunnable> blockedCallback);
|
||||
|
|
|
@ -411,6 +411,11 @@ public class PagingStoreImpl implements PagingStore {
|
|||
return size.getSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAddressElements() {
|
||||
return size.getElements();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxSize() {
|
||||
if (maxSize <= 0) {
|
||||
|
@ -1119,8 +1124,8 @@ public class PagingStoreImpl implements PagingStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addSize(final int size, boolean sizeOnly) {
|
||||
long newSize = this.size.addSize(size, sizeOnly);
|
||||
public void addSize(final int size, boolean sizeOnly, boolean affectGlobal) {
|
||||
long newSize = this.size.addSize(size, sizeOnly, affectGlobal);
|
||||
boolean globalFull = pagingManager.isGlobalFull();
|
||||
|
||||
if (newSize < 0) {
|
||||
|
|
|
@ -1041,12 +1041,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
@Override
|
||||
public void refUp(MessageReference messageReference) {
|
||||
int count = messageReference.getMessage().refUp();
|
||||
PagingStore owner = (PagingStore) messageReference.getMessage().getOwner();
|
||||
if (count == 1) {
|
||||
if (messageReference.getMessage().getOwner() != null) {
|
||||
((PagingStore)messageReference.getMessage().getOwner()).addSize(messageReference.getMessageMemoryEstimate(), false);
|
||||
if (owner != null) {
|
||||
owner.addSize(messageReference.getMessageMemoryEstimate(), false);
|
||||
}
|
||||
}
|
||||
if (pagingStore != null) {
|
||||
if (owner != null && pagingStore != owner) {
|
||||
// If an AMQP message parses its properties, its size might be updated and the address will receive more bytes.
|
||||
// However, in this case, we should always use the original estimate.
|
||||
// Otherwise, we might get incorrect sizes after the update.
|
||||
pagingStore.addSize(messageReference.getMessage().getOriginalEstimate(), false, false);
|
||||
}
|
||||
|
||||
pagingStore.refUp(messageReference.getMessage(), count);
|
||||
}
|
||||
}
|
||||
|
@ -1054,12 +1062,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
@Override
|
||||
public void refDown(MessageReference messageReference) {
|
||||
int count = messageReference.getMessage().refDown();
|
||||
if (count == 0) {
|
||||
if (messageReference.getMessage().getOwner() != null) {
|
||||
((PagingStore)messageReference.getMessage().getOwner()).addSize(-messageReference.getMessageMemoryEstimate(), false);
|
||||
}
|
||||
PagingStore owner = (PagingStore) messageReference.getMessage().getOwner();
|
||||
if (count == 0 && owner != null) {
|
||||
owner.addSize(-messageReference.getMessageMemoryEstimate(), false);
|
||||
}
|
||||
if (pagingStore != null) {
|
||||
if (owner != null && pagingStore != owner) {
|
||||
// If an AMQP message parses its properties, its size might be updated and the address will receive more bytes.
|
||||
// However, in this case, we should always use the original estimate.
|
||||
// Otherwise, we might get incorrect sizes after the update.
|
||||
pagingStore.addSize(-messageReference.getMessage().getOriginalEstimate(), false, false);
|
||||
}
|
||||
pagingStore.refDown(messageReference.getMessage(), count);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1376,5 +1376,90 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCalculationSize() throws Exception {
|
||||
testCalculationSize(false);
|
||||
}
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testCalculationSizeRestartSource() throws Exception {
|
||||
testCalculationSize(true);
|
||||
}
|
||||
|
||||
private void testCalculationSize(boolean restartSource) throws Exception {
|
||||
String brokerConnectionName = "brokerConnectionName:" + UUIDGenerator.getInstance().generateStringUUID();
|
||||
server.setIdentity("targetServer");
|
||||
server.start();
|
||||
|
||||
server_2 = createServer(AMQP_PORT_2, false);
|
||||
server_2.setIdentity("server_2");
|
||||
server_2.getConfiguration().setName("server2");
|
||||
|
||||
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
|
||||
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true);
|
||||
replica.setName("theReplica");
|
||||
amqpConnection.addElement(replica);
|
||||
server_2.getConfiguration().addAMQPConnection(amqpConnection);
|
||||
server_2.getConfiguration().setName("server_2");
|
||||
|
||||
int NUMBER_OF_MESSAGES = 1;
|
||||
|
||||
server_2.start();
|
||||
Wait.assertTrue(server_2::isStarted);
|
||||
|
||||
// We create the address to avoid auto delete on the queue
|
||||
server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
|
||||
server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
|
||||
|
||||
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
|
||||
|
||||
Queue queueOnServer2 = locateQueue(server_2, getQueueName());
|
||||
|
||||
server.stop();
|
||||
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||
Message message = session.createTextMessage(getText(false, i));
|
||||
message.setIntProperty("i", i);
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
if (restartSource) {
|
||||
server_2.stop();
|
||||
// Have to disable queueCreation, otherwise once the server restarts more data will be added to the SNF queue what will messup with the Assertions.
|
||||
// The idea is to make sure the reference counting between two addresses will act correctly
|
||||
replica.setQueueCreation(false).setQueueRemoval(false);
|
||||
server_2.start();
|
||||
}
|
||||
|
||||
Queue snfreplica = server_2.locateQueue(replica.getMirrorSNF());
|
||||
assertNotNull(snfreplica);
|
||||
|
||||
logger.info("Size on queueOnServer2:: {}", queueOnServer2.getPagingStore().getAddressSize());
|
||||
logger.info("Size on SNF:: {}", snfreplica.getPagingStore().getAddressSize());
|
||||
|
||||
Wait.assertTrue(() -> queueOnServer2.getPagingStore().getAddressSize() == snfreplica.getPagingStore().getAddressSize(), 5000);
|
||||
Wait.assertTrue(() -> queueOnServer2.getPagingStore().getAddressElements() == snfreplica.getPagingStore().getAddressElements(), 5000);
|
||||
|
||||
logger.info("Size on queueOnServer2:: {}, elements={}", queueOnServer2.getPagingStore().getAddressSize(), queueOnServer2.getPagingStore().getAddressElements());
|
||||
logger.info("Size on SNF:: {}, elements={}", snfreplica.getPagingStore().getAddressSize(), snfreplica.getPagingStore().getAddressElements());
|
||||
|
||||
server.start();
|
||||
Wait.assertTrue(server::isStarted);
|
||||
|
||||
Queue queueOnServer1 = locateQueue(server, getQueueName());
|
||||
assertFalse(loggerHandler.findText("AMQ222214"));
|
||||
|
||||
Wait.assertEquals(0L, snfreplica.getPagingStore()::getAddressElements);
|
||||
Wait.assertEquals(0L, snfreplica.getPagingStore()::getAddressSize);
|
||||
|
||||
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount, 2000);
|
||||
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount);
|
||||
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount);
|
||||
|
||||
assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF()));
|
||||
}
|
||||
|
||||
}
|
|
@ -587,25 +587,23 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
|
|||
assertEquals("KEY", lvqQueue2.getQueueConfiguration().getLastValueKey().toString());
|
||||
|
||||
ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
|
||||
Connection connection1 = cf1.createConnection();
|
||||
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
connection1.start();
|
||||
|
||||
Queue queue = session1.createQueue(lvqName);
|
||||
|
||||
MessageProducer producerServer1 = session1.createProducer(queue);
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
TextMessage message = session1.createTextMessage("test");
|
||||
message.setIntProperty("i", 0);
|
||||
message.setStringProperty("KEY", "" + (i % 10));
|
||||
producerServer1.send(message);
|
||||
try (Connection connection1 = cf1.createConnection()) {
|
||||
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session1.createQueue(lvqName);
|
||||
MessageProducer producerServer1 = session1.createProducer(queue);
|
||||
connection1.start();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
TextMessage message = session1.createTextMessage("test");
|
||||
message.setIntProperty("i", 0);
|
||||
message.setStringProperty("KEY", "" + (i % 10));
|
||||
producerServer1.send(message);
|
||||
}
|
||||
}
|
||||
assertFalse(loggerHandler.findText("AMQ222214"));
|
||||
|
||||
Wait.assertEquals(10L, lvqQueue1::getMessageCount, 2000, 100);
|
||||
Wait.assertEquals(10L, lvqQueue2::getMessageCount, 2000, 100);
|
||||
|
||||
connection1.close();
|
||||
|
||||
server_2.stop();
|
||||
server.stop();
|
||||
|
|
|
@ -370,6 +370,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAddressElements() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxSize() {
|
||||
return 0;
|
||||
|
@ -501,7 +506,7 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addSize(int size, boolean sizeOnly) {
|
||||
public void addSize(int size, boolean sizeOnly, boolean affectGlobal) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.activemq.artemis.tests.smoke.brokerConnection;
|
|||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
|
@ -30,10 +29,8 @@ import javax.jms.Queue;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
||||
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
|
@ -42,9 +39,13 @@ import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
|
|||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class PagedMirrorSmokeTest extends SmokeTestBase {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
// Change this to true to generate a print-data in certain cases on this test
|
||||
private static final boolean PRINT_DATA = false;
|
||||
|
||||
|
@ -93,9 +94,6 @@ public class PagedMirrorSmokeTest extends SmokeTestBase {
|
|||
String consumeURI = "tcp://localhost:61616";
|
||||
String secondConsumeURI = "tcp://localhost:61617";
|
||||
|
||||
File countJournalLocation = new File(getServerLocation(SERVER_NAME_A), "data/journal");
|
||||
File countJournalLocationB = new File(getServerLocation(SERVER_NAME_B), "data/journal");
|
||||
assertTrue(countJournalLocation.exists() && countJournalLocation.isDirectory());
|
||||
String protocol = "amqp";
|
||||
|
||||
ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol, sendURI);
|
||||
|
@ -127,7 +125,7 @@ public class PagedMirrorSmokeTest extends SmokeTestBase {
|
|||
sendSession.commit();
|
||||
}
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
try (Connection consumeConnection = consumeCF.createConnection()) {
|
||||
Session consumeSession = consumeConnection.createSession(false, 101); // individual ack
|
||||
Queue jmsQueue = consumeSession.createQueue("someQueue");
|
||||
|
@ -141,8 +139,9 @@ public class PagedMirrorSmokeTest extends SmokeTestBase {
|
|||
}
|
||||
assertNull(consumer.receiveNoWait());
|
||||
}
|
||||
Wait.assertEquals(1, () -> acksCount(countJournalLocation), 5000, 1000);
|
||||
Wait.assertEquals(1, () -> acksCount(countJournalLocationB), 5000, 1000);
|
||||
|
||||
Wait.assertEquals(0, () -> getMessageCount(consumeURI, "$ACTIVEMQ_ARTEMIS_MIRROR_outgoing"));
|
||||
Wait.assertEquals(NUMBER_OF_MESSAGES - 1, () -> getMessageCount(secondConsumeURI, "someQueue"));
|
||||
|
||||
try (Connection consumeConnection = secondConsumeCF.createConnection()) {
|
||||
Session consumeSession = consumeConnection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
@ -158,11 +157,4 @@ public class PagedMirrorSmokeTest extends SmokeTestBase {
|
|||
assertNull(consumer.receiveNoWait());
|
||||
}
|
||||
}
|
||||
|
||||
private int acksCount(File countJournalLocation) throws Exception {
|
||||
HashMap<Integer, AtomicInteger> countJournal = countJournal(countJournalLocation, 10485760, 2, 2);
|
||||
AtomicInteger acksCount = countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
|
||||
return acksCount != null ? acksCount.get() : 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,214 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import java.io.File;
|
||||
import java.io.StringWriter;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
|
||||
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.util.ServerUtil;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class AccumulatedInPageSoakTest extends SoakTestBase {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static String body;
|
||||
|
||||
static {
|
||||
StringWriter writer = new StringWriter();
|
||||
while (writer.getBuffer().length() < 20 * 1024) {
|
||||
writer.append("This is a string ..... ");
|
||||
}
|
||||
body = writer.toString();
|
||||
}
|
||||
|
||||
private static final String QUEUE_NAME = "AccumulatePage";
|
||||
|
||||
public static final String DC1_NODE_A = "AccumulatedInPageSoakTest/DC1";
|
||||
public static final String DC2_NODE_A = "AccumulatedInPageSoakTest/DC2";
|
||||
|
||||
private static final String SNF_QUEUE = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
|
||||
|
||||
Process processDC1_node_A;
|
||||
Process processDC2_node_A;
|
||||
|
||||
private static String DC1_NODEA_URI = "tcp://localhost:61616";
|
||||
private static String DC2_NODEA_URI = "tcp://localhost:61618";
|
||||
|
||||
private static void createServer(String serverName,
|
||||
String connectionName,
|
||||
String mirrorURI,
|
||||
int porOffset) throws Exception {
|
||||
File serverLocation = getFileServerLocation(serverName);
|
||||
deleteDirectory(serverLocation);
|
||||
|
||||
HelperCreate cliCreateServer = new HelperCreate();
|
||||
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
|
||||
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
|
||||
cliCreateServer.setClustered(false);
|
||||
cliCreateServer.setNoWeb(false);
|
||||
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
|
||||
cliCreateServer.addArgs("--queues", QUEUE_NAME);
|
||||
cliCreateServer.addArgs("--java-memory", "512M");
|
||||
cliCreateServer.setPortOffset(porOffset);
|
||||
cliCreateServer.createServer();
|
||||
|
||||
Properties brokerProperties = new Properties();
|
||||
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
|
||||
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
|
||||
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
|
||||
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
|
||||
brokerProperties.put("largeMessageSync", "false");
|
||||
|
||||
brokerProperties.put("addressSettings.#.maxSizeBytes", Integer.toString(100 * 1024 * 1024));
|
||||
brokerProperties.put("addressSettings.#.addressFullMessagePolicy", "PAGING");
|
||||
|
||||
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
|
||||
saveProperties(brokerProperties, brokerPropertiesFile);
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
public static void createServers() throws Exception {
|
||||
createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
|
||||
createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void cleanupServers() {
|
||||
cleanupData(DC1_NODE_A);
|
||||
cleanupData(DC2_NODE_A);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(value = 240_000L, unit = TimeUnit.MILLISECONDS)
|
||||
public void testAccumulateWhileMirrorDown() throws Exception {
|
||||
String protocol = "AMQP"; // no need to run this test using multiple protocols. this is about validating paging works correctly
|
||||
startDC1();
|
||||
|
||||
ExecutorService service = Executors.newFixedThreadPool(1);
|
||||
runAfter(service::shutdownNow);
|
||||
|
||||
final int numberOfMessages = 20_000;
|
||||
final int commitInterval = 1000;
|
||||
|
||||
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI);
|
||||
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI);
|
||||
|
||||
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
|
||||
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
|
||||
AtomicInteger errors = new AtomicInteger(0);
|
||||
CountDownLatch done = new CountDownLatch(1);
|
||||
|
||||
service.execute(() -> {
|
||||
|
||||
try (Connection connection = connectionFactoryDC1A.createConnection()) {
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
connection.start();
|
||||
Queue queue = session.createQueue(QUEUE_NAME);
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
connection.start();
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
assertNotNull(consumer.receive(5_000));
|
||||
if (i > 0 && i % commitInterval == 0) {
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
session.commit();
|
||||
|
||||
} catch (Throwable e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
errors.incrementAndGet();
|
||||
} finally {
|
||||
done.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try (Connection connection = connectionFactoryDC1A.createConnection()) {
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue queue = session.createQueue(QUEUE_NAME);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
producer.send(session.createTextMessage(body));
|
||||
if (i > 0 && i % commitInterval == 0) {
|
||||
logger.info("Sent {}", i);
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
session.commit();
|
||||
}
|
||||
|
||||
done.await();
|
||||
assertEquals(0, errors.get());
|
||||
|
||||
startDC2();
|
||||
Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC1A, SNF_QUEUE), 240_000, 500);
|
||||
Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC2A, QUEUE_NAME), 60_000, 500);
|
||||
|
||||
}
|
||||
|
||||
private void startDC1() throws Exception {
|
||||
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
|
||||
ServerUtil.waitForServerToStart(0, 10_000);
|
||||
}
|
||||
|
||||
private void stopDC1() throws Exception {
|
||||
processDC1_node_A.destroyForcibly();
|
||||
assertTrue(processDC1_node_A.waitFor(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private void stopDC2() throws Exception {
|
||||
processDC2_node_A.destroyForcibly();
|
||||
assertTrue(processDC2_node_A.waitFor(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private void startDC2() throws Exception {
|
||||
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
|
||||
ServerUtil.waitForServerToStart(2, 10_000);
|
||||
}
|
||||
}
|
|
@ -105,9 +105,6 @@ public class PagedSNFSoakTest extends SoakTestBase {
|
|||
brokerProperties.put("addressSettings.#.maxSizeMessages", "100");
|
||||
brokerProperties.put("addressSettings.#.addressFullMessagePolicy", "PAGING");
|
||||
|
||||
//brokerProperties.put("addressSettings.*MIRROR*.maxSizeMessages", "100");
|
||||
//brokerProperties.put("addressSettings.*MIRROR*.addressFullMessagePolicy", "BLOCK");
|
||||
|
||||
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
|
||||
saveProperties(brokerProperties, brokerPropertiesFile);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue