ARTEMIS-4776 Pages may leak as open on Replicated Target
PagingStore is supposed to send an event to replica on every file that is closed. There are a few situation where the sendClose is being missed and that could generate leaks on the target
This commit is contained in:
parent
f5973d53e6
commit
49189cd7e6
|
@ -231,21 +231,21 @@ public final class Page {
|
|||
return isOpen;
|
||||
}
|
||||
|
||||
public void close(boolean sendEvent) throws Exception {
|
||||
close(sendEvent, true);
|
||||
public void close(boolean sendReplicaClose) throws Exception {
|
||||
close(sendReplicaClose, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* sendEvent means it's a close happening from a major event such moveNext.
|
||||
* While reading the cache we don't need (and shouldn't inform the backup
|
||||
*/
|
||||
public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception {
|
||||
public synchronized void close(boolean sendReplicaClose, boolean waitSync) throws Exception {
|
||||
if (readFileBuffer != null) {
|
||||
fileFactory.releaseDirectBuffer(readFileBuffer);
|
||||
readFileBuffer = null;
|
||||
}
|
||||
|
||||
if (sendEvent && storageManager != null) {
|
||||
if (sendReplicaClose && storageManager != null) {
|
||||
storageManager.pageClosed(storeName, pageId);
|
||||
}
|
||||
file.close(waitSync, waitSync);
|
||||
|
|
|
@ -577,7 +577,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
final Page page = currentPage;
|
||||
if (page != null) {
|
||||
page.close(false);
|
||||
page.close(true);
|
||||
currentPage = null;
|
||||
}
|
||||
}
|
||||
|
@ -994,7 +994,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
}
|
||||
|
||||
returnPage = currentPage;
|
||||
returnPage.close(false);
|
||||
returnPage.close(true);
|
||||
resetCurrentPage(null);
|
||||
|
||||
// The current page is empty... which means we reached the end of the pages
|
||||
|
|
|
@ -902,6 +902,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
this.executor = executor2;
|
||||
}
|
||||
|
||||
public ConcurrentMap<SimpleString, ConcurrentMap<Long, Page>> getPageIndex() {
|
||||
return pageIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
|
||||
*/
|
||||
|
|
|
@ -778,6 +778,9 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
} finally {
|
||||
if (file.isOpen())
|
||||
file.close();
|
||||
if (pageStore != null) {
|
||||
sendReplicatePacket(new ReplicationPageEventMessage(pageStore, id, false, remotingConnection.isVersionUsingLongOnPageReplication()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/*false
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
|
@ -70,7 +70,7 @@ public class HelperCreate extends HelperBase {
|
|||
|
||||
private boolean clustered = false;
|
||||
|
||||
private boolean slave = false;
|
||||
private boolean backup = false;
|
||||
|
||||
private String staticCluster;
|
||||
|
||||
|
@ -184,12 +184,12 @@ public class HelperCreate extends HelperBase {
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean isSlave() {
|
||||
return slave;
|
||||
public boolean isBackup() {
|
||||
return backup;
|
||||
}
|
||||
|
||||
public HelperCreate setSlave(boolean slave) {
|
||||
this.slave = slave;
|
||||
public HelperCreate setBackup(boolean backup) {
|
||||
this.backup = backup;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -301,8 +301,8 @@ public class HelperCreate extends HelperBase {
|
|||
add(listCommands, "--no-web");
|
||||
}
|
||||
|
||||
if (slave) {
|
||||
add(listCommands, "--slave");
|
||||
if (backup) {
|
||||
add(listCommands, "--backup");
|
||||
}
|
||||
|
||||
if (replicated) {
|
||||
|
|
|
@ -16,13 +16,34 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
|
||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class NettyReplicatedFailoverTest extends NettyFailoverTest {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
@Override
|
||||
protected TestableServer createTestableServer(Configuration config) {
|
||||
return new SameProcessActiveMQServer(createServer(true, config));
|
||||
|
@ -49,4 +70,71 @@ public class NettyReplicatedFailoverTest extends NettyFailoverTest {
|
|||
protected final void crash(ClientSession... sessions) throws Exception {
|
||||
crash(true, sessions);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPagedInSync() throws Exception {
|
||||
|
||||
String queueName = "testPagedInSync";
|
||||
|
||||
ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
|
||||
try (Connection conn = factory.createConnection()) {
|
||||
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.send(session.createTextMessage("hello"));
|
||||
session.commit();
|
||||
|
||||
org.apache.activemq.artemis.core.server.Queue serverQueue = primaryServer.getServer().locateQueue(queueName);
|
||||
Assert.assertNotNull(serverQueue);
|
||||
|
||||
serverQueue.getPagingStore().startPaging();
|
||||
|
||||
for (int i = 0; i < 50; i++) {
|
||||
producer.send(session.createTextMessage("hello"));
|
||||
session.commit();
|
||||
serverQueue.getPagingStore().forceAnotherPage();
|
||||
}
|
||||
backupServer.stop();
|
||||
backupServer.start();
|
||||
Wait.assertTrue(backupServer.getServer()::isReplicaSync);
|
||||
|
||||
SharedNothingBackupActivation activation = (SharedNothingBackupActivation) backupServer.getServer().getActivation();
|
||||
Map<Long, Page> currentPages = activation.getReplicationEndpoint().getPageIndex().get(SimpleString.toSimpleString(queueName));
|
||||
|
||||
logger.info("There are {} page files open", currentPages.size());
|
||||
Wait.assertTrue(() -> currentPages.size() <= 1, 10_000);
|
||||
|
||||
producer.send(session.createTextMessage("on currentPage"));
|
||||
session.commit();
|
||||
|
||||
PagingStore store = primaryServer.getServer().getPagingManager().getPageStore(SimpleString.toSimpleString(queueName));
|
||||
Page currentPage = store.getCurrentPage();
|
||||
logger.info("Page {}", currentPage.getPageId());
|
||||
|
||||
Page depaged = null;
|
||||
for (; ; ) {
|
||||
depaged = store.depage();
|
||||
if (depaged == null || currentPage.getPageId() == depaged.getPageId()) {
|
||||
break;
|
||||
}
|
||||
logger.info("depage :: {} and currentPageID={}", depaged.getPageId(), currentPage.getPageId());
|
||||
}
|
||||
|
||||
Assert.assertNotNull(depaged);
|
||||
|
||||
logger.info("Depaged:: {}", depaged.getPageId());
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
producer.send(session.createTextMessage("on current page"));
|
||||
session.commit();
|
||||
store.depage();
|
||||
}
|
||||
|
||||
logger.info("Size:: {}", currentPages.size());
|
||||
|
||||
Wait.assertTrue(() -> currentPages.size() <= 1, 10_000);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -69,13 +69,13 @@ public class CheckTest extends SmokeTestBase {
|
|||
|
||||
{
|
||||
HelperCreate cliCreateServer = new HelperCreate();
|
||||
cliCreateServer.setSharedStore(true).setSlave(false).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61716").setArtemisInstance(server0Location);
|
||||
cliCreateServer.setSharedStore(true).setBackup(false).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61716").setArtemisInstance(server0Location);
|
||||
cliCreateServer.createServer();
|
||||
}
|
||||
|
||||
{
|
||||
HelperCreate cliCreateServer = new HelperCreate();
|
||||
cliCreateServer.setSharedStore(true).setSlave(true).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61616").setPortOffset(100).setArtemisInstance(server1Location);
|
||||
cliCreateServer.setSharedStore(true).setBackup(true).setSharedStore(true).setDataFolder("./target/check-test/data").setFailoverOnShutdown(true).setStaticCluster("tcp://localhost:61616").setPortOffset(100).setArtemisInstance(server1Location);
|
||||
cliCreateServer.createServer();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,419 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import java.io.File;
|
||||
import java.io.StringWriter;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
|
||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||
import org.apache.activemq.artemis.util.ServerUtil;
|
||||
import org.apache.activemq.artemis.utils.FileUtil;
|
||||
import org.apache.activemq.artemis.utils.TestParameters;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
|
||||
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ReplicatedMirrorTargetTest extends SoakTestBase {
|
||||
|
||||
private static final String TEST_NAME = "REPLICATED_MIRROR_SOAK";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
// Set this to true and log4j will be configured with some relevant log.trace for the AckManager at the server's
|
||||
private static final boolean TRACE_LOGS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", "false"));
|
||||
private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_000);
|
||||
|
||||
// By default consuming 90% of the messages
|
||||
private static final int NUMBER_MESSAGES_RECEIVE = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES_RECEIVE", 2_000);
|
||||
private static final int RECEIVE_COMMIT = TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 200);
|
||||
private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 200);
|
||||
|
||||
// If -1 means to never kill the target broker
|
||||
private static final int KILL_INTERVAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 1_000);
|
||||
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 300_000);
|
||||
private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
|
||||
|
||||
/*
|
||||
* Time each consumer takes to process a message received to allow some messages accumulating.
|
||||
* This sleep happens right before the commit.
|
||||
*/
|
||||
private static final int CONSUMER_PROCESSING_TIME = TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
|
||||
|
||||
private static final String TOPIC_NAME = "topicTest";
|
||||
|
||||
private static String body;
|
||||
|
||||
static {
|
||||
StringWriter writer = new StringWriter();
|
||||
while (writer.getBuffer().length() < 30 * 1024) {
|
||||
writer.append("The sky is blue, ..... watch out for poop from the birds though!...");
|
||||
}
|
||||
body = writer.toString();
|
||||
}
|
||||
|
||||
public static final String DC1_NODE = "ReplicatedMirrorTargetTest/DC1";
|
||||
public static final String DC2_NODE = "ReplicatedMirrorTargetTest/DC2";
|
||||
public static final String DC2_REPLICA_NODE = "ReplicatedMirrorTargetTest/DC2_REPLICA";
|
||||
|
||||
volatile Process processDC1;
|
||||
volatile Process processDC2;
|
||||
volatile Process processDC2_REPLICA;
|
||||
|
||||
@After
|
||||
public void destroyServers() throws Exception {
|
||||
if (processDC1 != null) {
|
||||
processDC1.destroyForcibly();
|
||||
processDC1.waitFor(1, TimeUnit.MINUTES);
|
||||
processDC1 = null;
|
||||
}
|
||||
if (processDC2 != null) {
|
||||
processDC2.destroyForcibly();
|
||||
processDC2.waitFor(1, TimeUnit.MINUTES);
|
||||
processDC2 = null;
|
||||
}
|
||||
|
||||
if (processDC2_REPLICA != null) {
|
||||
processDC2_REPLICA.destroyForcibly();
|
||||
processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
|
||||
processDC2_REPLICA = null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final String DC1_URI = "tcp://localhost:61616";
|
||||
private static final String DC2_URI = "tcp://localhost:61618";
|
||||
|
||||
private static final String DC2_REPLICA_URI = "tcp://localhost:61619";
|
||||
|
||||
private static void createServer(String serverName,
|
||||
String connectionName,
|
||||
String mirrorURI,
|
||||
int porOffset,
|
||||
boolean paging,
|
||||
boolean replicated,
|
||||
String clusterStatic) throws Exception {
|
||||
File serverLocation = getFileServerLocation(serverName);
|
||||
deleteDirectory(serverLocation);
|
||||
|
||||
HelperCreate cliCreateServer = new HelperCreate();
|
||||
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
|
||||
cliCreateServer.setNoWeb(false);
|
||||
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
|
||||
cliCreateServer.addArgs("--addresses", TOPIC_NAME);
|
||||
cliCreateServer.setPortOffset(porOffset);
|
||||
if (replicated) {
|
||||
cliCreateServer.setReplicated(true);
|
||||
cliCreateServer.setStaticCluster(clusterStatic);
|
||||
cliCreateServer.setClustered(true);
|
||||
} else {
|
||||
cliCreateServer.setClustered(false);
|
||||
}
|
||||
|
||||
cliCreateServer.createServer();
|
||||
|
||||
Properties brokerProperties = new Properties();
|
||||
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
|
||||
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
|
||||
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
|
||||
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
|
||||
brokerProperties.put("largeMessageSync", "false");
|
||||
|
||||
if (paging) {
|
||||
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
|
||||
brokerProperties.put("addressSettings.#.maxReadPageMessages", "2000");
|
||||
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
|
||||
brokerProperties.put("addressSettings.#.prefetchPageMessages", "500");
|
||||
}
|
||||
// if we don't use pageTransactions we may eventually get a few duplicates
|
||||
brokerProperties.put("mirrorPageTransaction", "true");
|
||||
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
|
||||
saveProperties(brokerProperties, brokerPropertiesFile);
|
||||
|
||||
File brokerXml = new File(serverLocation, "/etc/broker.xml");
|
||||
Assert.assertTrue(brokerXml.exists());
|
||||
// Adding redistribution delay to broker configuration
|
||||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
|
||||
|
||||
if (TRACE_LOGS) {
|
||||
replaceLogs(serverLocation);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static void replaceLogs(File serverLocation) throws Exception {
|
||||
File log4j = new File(serverLocation, "/etc/log4j2.properties");
|
||||
Assert.assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" +
|
||||
"\n" + "logger.endpoint.name=org.apache.activemq.artemis.core.replication.ReplicationEndpoint\n"
|
||||
+ "logger.endpoint.level=DEBUG\n"
|
||||
+ "appender.console.filter.threshold.type = ThresholdFilter\n"
|
||||
+ "appender.console.filter.threshold.level = info"));
|
||||
}
|
||||
|
||||
private static void createBackupServer(String serverName,
|
||||
int porOffset,
|
||||
String clusterStatic) throws Exception {
|
||||
File serverLocation = getFileServerLocation(serverName);
|
||||
deleteDirectory(serverLocation);
|
||||
|
||||
HelperCreate cliCreateServer = new HelperCreate();
|
||||
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
|
||||
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
|
||||
cliCreateServer.setNoWeb(false);
|
||||
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
|
||||
cliCreateServer.addArgs("--addresses", TOPIC_NAME);
|
||||
cliCreateServer.setPortOffset(porOffset);
|
||||
cliCreateServer.setClustered(true);
|
||||
cliCreateServer.setReplicated(true);
|
||||
cliCreateServer.setBackup(true);
|
||||
cliCreateServer.setStaticCluster(clusterStatic);
|
||||
cliCreateServer.createServer();
|
||||
|
||||
File brokerXml = new File(serverLocation, "/etc/broker.xml");
|
||||
Assert.assertTrue(brokerXml.exists());
|
||||
// Adding redistribution delay to broker configuration
|
||||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
|
||||
|
||||
if (TRACE_LOGS) {
|
||||
replaceLogs(serverLocation);
|
||||
}
|
||||
}
|
||||
|
||||
public static void createRealServers(boolean paging) throws Exception {
|
||||
createServer(DC1_NODE, "mirror", DC2_URI, 0, paging, false, null);
|
||||
createServer(DC2_NODE, "mirror", DC1_URI, 2, paging, true, DC2_REPLICA_URI);
|
||||
createBackupServer(DC2_REPLICA_NODE, 3, DC2_URI);
|
||||
}
|
||||
|
||||
private void startServers() throws Exception {
|
||||
processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties"));
|
||||
processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties"));
|
||||
|
||||
ServerUtil.waitForServerToStart(0, 10_000);
|
||||
ServerUtil.waitForServerToStart(2, 10_000);
|
||||
|
||||
processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties"));
|
||||
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMirrorOnReplica() throws Exception {
|
||||
createRealServers(true);
|
||||
startServers();
|
||||
|
||||
|
||||
Assert.assertTrue(KILL_INTERVAL > SEND_COMMIT || KILL_INTERVAL < 0);
|
||||
|
||||
String clientIDA = "nodeA";
|
||||
String clientIDB = "nodeB";
|
||||
String subscriptionID = "my-order";
|
||||
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
|
||||
|
||||
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_URI);
|
||||
|
||||
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT);
|
||||
consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT);
|
||||
|
||||
SimpleManagement managementDC1 = new SimpleManagement(DC1_URI, null, null);
|
||||
SimpleManagement managementDC2 = new SimpleManagement(DC2_URI, null, null);
|
||||
|
||||
runAfter(() -> managementDC1.close());
|
||||
runAfter(() -> managementDC2.close());
|
||||
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID));
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID));
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID));
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID));
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(3);
|
||||
runAfter(executorService::shutdownNow);
|
||||
CountDownLatch consumerDone = new CountDownLatch(2);
|
||||
executorService.execute(() -> {
|
||||
try {
|
||||
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
} finally {
|
||||
consumerDone.countDown();
|
||||
}
|
||||
});
|
||||
executorService.execute(() -> {
|
||||
try {
|
||||
consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, NUMBER_MESSAGES_RECEIVE, false, false, RECEIVE_COMMIT);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
} finally {
|
||||
consumerDone.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
OrderedExecutor restartExeuctor = new OrderedExecutor(executorService);
|
||||
AtomicBoolean running = new AtomicBoolean(true);
|
||||
runAfter(() -> running.set(false));
|
||||
|
||||
try (Connection connection = connectionFactoryDC1A.createConnection()) {
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageProducer producer = session.createProducer(session.createTopic(TOPIC_NAME));
|
||||
for (int i = 0; i < NUMBER_MESSAGES; i++) {
|
||||
TextMessage message = session.createTextMessage(body);
|
||||
message.setIntProperty("i", i);
|
||||
message.setBooleanProperty("large", false);
|
||||
producer.send(message);
|
||||
if (i > 0 && i % SEND_COMMIT == 0) {
|
||||
logger.info("Sent {} messages", i);
|
||||
session.commit();
|
||||
}
|
||||
if (KILL_INTERVAL > 0 && i > 0 && i % KILL_INTERVAL == 0) {
|
||||
restartExeuctor.execute(() -> {
|
||||
if (running.get()) {
|
||||
try {
|
||||
logger.info("Restarting target server (DC2)");
|
||||
if (processDC2_REPLICA != null) {
|
||||
processDC2_REPLICA.destroyForcibly();
|
||||
processDC2_REPLICA.waitFor(1, TimeUnit.MINUTES);
|
||||
processDC2_REPLICA = null;
|
||||
}
|
||||
if (processDC2 != null) {
|
||||
processDC2.destroyForcibly();
|
||||
processDC2.waitFor(1, TimeUnit.MINUTES);
|
||||
processDC2 = null;
|
||||
}
|
||||
processDC2 = startServer(DC2_NODE, 2, 10_000, new File(getServerLocation(DC2_NODE), "broker.properties"));
|
||||
processDC2_REPLICA = startServer(DC2_REPLICA_NODE, -1, -1);
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
session.commit();
|
||||
running.set(false);
|
||||
}
|
||||
|
||||
consumerDone.await(SNF_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
|
||||
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
|
||||
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
|
||||
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
Wait.assertEquals(NUMBER_MESSAGES - NUMBER_MESSAGES_RECEIVE, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
|
||||
|
||||
destroyServers();
|
||||
|
||||
// counting the number of records on duplicate cache
|
||||
// to validate if ARTEMIS-4765 is fixed
|
||||
ActiveMQServer server = createServer(true, false);
|
||||
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + "/data/journal");
|
||||
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + "/data/bindings");
|
||||
server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) + "/data/paging");
|
||||
server.start();
|
||||
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
|
||||
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());
|
||||
AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
|
||||
Assert.assertNotNull(duplicateRecordsCount);
|
||||
// 1000 credits by default
|
||||
Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
|
||||
|
||||
}
|
||||
|
||||
private static void consume(ConnectionFactory factory,
|
||||
String clientID,
|
||||
String subscriptionID,
|
||||
int start,
|
||||
int numberOfMessages,
|
||||
boolean expectEmpty,
|
||||
boolean assertBody,
|
||||
int batchCommit) throws Exception {
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
connection.setClientID(clientID);
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Topic topic = session.createTopic(TOPIC_NAME);
|
||||
connection.start();
|
||||
MessageConsumer consumer = session.createDurableConsumer(topic, subscriptionID);
|
||||
boolean failed = false;
|
||||
|
||||
int pendingCommit = 0;
|
||||
|
||||
for (int i = start; i < start + numberOfMessages; i++) {
|
||||
TextMessage message = (TextMessage) consumer.receive(10_000);
|
||||
Assert.assertNotNull(message);
|
||||
logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large"));
|
||||
if (message.getIntProperty("i") != i) {
|
||||
failed = true;
|
||||
logger.warn("Expected message {} but got {}", i, message.getIntProperty("i"));
|
||||
}
|
||||
logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large"));
|
||||
pendingCommit++;
|
||||
if (pendingCommit >= batchCommit) {
|
||||
if (CONSUMER_PROCESSING_TIME > 0) {
|
||||
Thread.sleep(CONSUMER_PROCESSING_TIME);
|
||||
}
|
||||
logger.info("received {}", i);
|
||||
session.commit();
|
||||
pendingCommit = 0;
|
||||
}
|
||||
}
|
||||
session.commit();
|
||||
|
||||
Assert.assertFalse(failed);
|
||||
|
||||
if (expectEmpty) {
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public long getCount(SimpleManagement simpleManagement, String queue) {
|
||||
try {
|
||||
long value = simpleManagement.getMessageCountOnQueue(queue);
|
||||
logger.info("Queue {} count = {}", queue, value);
|
||||
return value;
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue