ARTEMIS-4564 Cleanup large message files in case of mirror duplicate detection

This commit is contained in:
Clebert Suconic 2024-01-11 11:05:20 -05:00 committed by clebertsuconic
parent c6a6e036a4
commit 1887b3fb8e
2 changed files with 211 additions and 0 deletions

View File

@ -504,6 +504,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
byte[] duplicateIDBytes = ByteUtil.longToBytes(internalIDLong); byte[] duplicateIDBytes = ByteUtil.longToBytes(internalIDLong);
if (duplicateIDCache.contains(duplicateIDBytes)) { if (duplicateIDCache.contains(duplicateIDBytes)) {
message.usageDown(); // large messages would be removed here
flow(); flow();
return false; return false;
} }

View File

@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InterruptedLargeMessageTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static String largeBody;
static {
StringWriter writer = new StringWriter();
while (writer.getBuffer().length() < 1024 * 1024) {
writer.append("This is a large string ..... ");
}
largeBody = writer.toString();
}
private static final String QUEUE_NAME = "myQueue";
public static final String DC1_NODE_A = "interruptLarge/DC1";
public static final String DC2_NODE_A = "interruptLarge/DC2";
Process processDC1_node_A;
Process processDC2_node_A;
private static String DC1_NODEA_URI = "tcp://localhost:61616";
private static String DC2_NODEA_URI = "tcp://localhost:61618";
private static void createServer(String serverName,
String connectionName,
String mirrorURI,
int porOffset) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setClustered(false);
cliCreateServer.setNoWeb(true);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
cliCreateServer.addArgs("--queues", QUEUE_NAME);
cliCreateServer.setPortOffset(porOffset);
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
}
@BeforeClass
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC2_NODEA_URI, 0);
createServer(DC2_NODE_A, "mirror", DC1_NODEA_URI, 2);
}
private void startDC1() throws Exception {
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
}
@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC2_NODE_A);
}
@Test
public void testAMQP() throws Exception {
testInterrupt("AMQP");
}
@Test
public void testCORE() throws Exception {
testInterrupt("CORE");
}
private void testInterrupt(final String protocol) throws Exception {
startDC1();
final int numberOfMessages = 400;
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory(protocol, DC1_NODEA_URI);
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory(protocol, DC2_NODEA_URI);
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = session.createTextMessage(largeBody);
message.setIntProperty("id", i);
producer.send(message);
if (i % 10 == 0) {
logger.debug("Sent {} messages", i);
session.commit();
}
}
session.commit();
startDC2();
// Waiting for at least one large message file in the target server
Wait.assertTrue(() -> getNumberOfLargeMessages(DC2_NODE_A) > 0, 5000, 100);
stopDC2();
startDC2();
}
try (Connection connection = connectionFactoryDC2A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("id"));
if (i % 10 == 0) {
session.commit();
logger.debug("Received {} messages", i);
}
}
session.commit();
}
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, snfQueue));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, snfQueue));
Wait.assertEquals(0, () -> getCount(simpleManagementDC2A, QUEUE_NAME));
Wait.assertEquals(0, () -> getCount(simpleManagementDC1A, QUEUE_NAME));
Wait.assertEquals(0, () -> getNumberOfLargeMessages(DC1_NODE_A), 5000);
Wait.assertEquals(0, () -> getNumberOfLargeMessages(DC2_NODE_A), 5000);
}
int getNumberOfLargeMessages(String serverName) throws Exception {
File lmFolder = new File(getServerLocation(serverName) + "/data/large-messages");
Assert.assertTrue(lmFolder.exists());
return lmFolder.list().length;
}
private void stopDC2() throws Exception {
processDC2_node_A.destroyForcibly();
Assert.assertTrue(processDC2_node_A.waitFor(10, TimeUnit.SECONDS));
}
private void startDC2() throws Exception {
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
ServerUtil.waitForServerToStart(2, 10_000);
}
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.debug("count on queue {} is {}", queue, value);
return value;
}
}