ARTEMIS-4193 Large Message Files orphaned after server killed

This commit is contained in:
Clebert Suconic 2023-03-02 14:18:04 -05:00 committed by clebertsuconic
parent aae65fd527
commit 9f1927d3fa
5 changed files with 320 additions and 5 deletions

View File

@ -447,7 +447,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
public long storePendingLargeMessage(final long messageID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
long recordID = generateID();
messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true));
if (logger.isTraceEnabled()) {
logger.trace("Storing pending large message for messageID={} on recordID={}", messageID, recordID);
}
// the pending record has to be stored and synced before the large message file is created
messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, null);
if (logger.isTraceEnabled()) {
logger.trace("...Stored pending large message for messageID={} on recordID={}", messageID, recordID);
}
return recordID;
}
@ -569,10 +576,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
}
// We do this here to avoid a case where the replication gets a list without this file
// to avoid a race
largeMessage.validateFile();
if (largeMessage.toMessage().isDurable()) {
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);
@ -580,6 +583,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
largeMessage.setPendingRecordID(pendingRecordID);
}
// the file has to be created after te record is stored
largeMessage.validateFile();
return largeMessage;
}

View File

@ -247,6 +247,31 @@
</args>
</configuration>
</execution>
<!-- Used on LargeMessageInterruptTest -->
<execution>
<phase>test-compile</phase>
<id>create-lminterrupt</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>artemis</user>
<password>artemis</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>false</noWeb>
<instance>${basedir}/target/lminterrupt</instance>
<configuration>${basedir}/target/classes/servers/lminterrupt</configuration>
<args>
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
<arg>--queues</arg>
<arg>LargeMessageInterruptTest</arg>
<arg>--name</arg>
<arg>lminterrupt</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<management-context xmlns="http://activemq.apache.org/schema">
<connector connector-port="1099"/>
<authorisation>
<allowlist>
<entry domain="hawtio"/>
</allowlist>
<default-access>
<access method="list*" roles="amq"/>
<access method="get*" roles="amq"/>
<access method="is*" roles="amq"/>
<access method="set*" roles="amq"/>
<access method="*" roles="amq"/>
</default-access>
<role-access>
<match domain="org.apache.activemq.artemis">
<access method="list*" roles="amq"/>
<access method="get*" roles="amq"/>
<access method="is*" roles="amq"/>
<access method="set*" roles="amq"/>
<!-- Note count and browse are need to access the browse tab in the console-->
<access method="browse*" roles="amq"/>
<access method="count*" roles="amq"/>
<access method="*" roles="amq"/>
</match>
<!--example of how to configure a specific object-->
<!--<match domain="org.apache.activemq.artemis" key="subcomponent=queues">
<access method="list*" roles="view,update,amq"/>
<access method="get*" roles="view,update,amq"/>
<access method="is*" roles="view,update,amq"/>
<access method="set*" roles="update,amq"/>
<access method="*" roles="amq"/>
</match>-->
</role-access>
</authorisation>
</management-context>

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.soak;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
@ -28,8 +29,11 @@ import java.net.MalformedURLException;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.cli.commands.Stop;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@ -152,6 +156,34 @@ public class SoakTestBase extends ActiveMQTestBase {
throw lastException;
}
protected static QueueControl getQueueControl(String uri,
ObjectNameBuilder builder,
String address,
String queueName,
RoutingType routingType,
long timeout) throws Throwable {
long expireLoop = System.currentTimeMillis() + timeout;
Throwable lastException = null;
do {
try {
JMXConnector connector = newJMXFactory(uri);
ObjectName objectQueueName = builder.getQueueObjectName(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType);
QueueControl queueControl = MBeanServerInvocationHandler.newProxyInstance(connector.getMBeanServerConnection(), objectQueueName, QueueControl.class, false);
queueControl.getMessagesAcknowledged(); // making one call
return queueControl;
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
lastException = e;
Thread.sleep(500);
}
}
while (expireLoop > System.currentTimeMillis());
throw lastException;
}
protected static JMXConnector getJmxConnector(String hostname, int port) throws MalformedURLException {
// Without this, the RMI server would bind to the default interface IP (the user's local IP mostly)
System.setProperty("java.rmi.server.hostname", hostname);

View File

@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.interruptlm;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// This is used to kill a server and make sure the server will remove any pending files.
public class LargeMessageInterruptTest extends SoakTestBase {
public static final String SERVER_NAME_0 = "lminterrupt";
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_SERVER_PORT_0 = 1099;
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "lminterrupt", true);
Process serverProcess;
public ConnectionFactory createConnectionFactory(String protocol) {
return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
}
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
serverProcess = startServer(SERVER_NAME_0, 0, 30000);
disableCheckThread();
}
@Test
public void testInterruptLargeMessageAMQPTX() throws Throwable {
testInterruptLM("AMQP", true);
}
@Test
public void testInterruptLargeMessageCORETX() throws Throwable {
testInterruptLM("CORE", true);
}
@Test
public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
testInterruptLM("AMQP", false);
}
@Test
public void testInterruptLargeMessageCORENonTX() throws Throwable {
testInterruptLM("CORE", false);
}
private void testInterruptLM(String protocol, boolean tx) throws Throwable {
final int BODY_SIZE = 500 * 1024;
final int NUMBER_OF_MESSAGES = 10; // this is per producer
final int SENDING_THREADS = 10;
CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
final AtomicInteger produced = new AtomicInteger(0);
final ConnectionFactory factory = createConnectionFactory(protocol);
final AtomicInteger errors = new AtomicInteger(0); // I don't expect many errors since this test is disconnecting and reconnecting the server
final CountDownLatch killAt = new CountDownLatch(40);
ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS);
runAfter(executorService::shutdownNow);
String queueName = "LargeMessageInterruptTest";
String body;
{
StringBuffer buffer = new StringBuffer();
while (buffer.length() < BODY_SIZE) {
buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T REALLY CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I DON'T REALLY CARE ");
}
body = buffer.toString();
}
for (int i = 0; i < SENDING_THREADS; i++) {
executorService.execute(() -> {
int numberOfMessages = 0;
try {
Connection connection = factory.createConnection();
Session session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(queueName));
startFlag.await(10, TimeUnit.SECONDS);
while (numberOfMessages < NUMBER_OF_MESSAGES) {
try {
producer.send(session.createTextMessage(body));
if (tx) {
session.commit();
}
produced.incrementAndGet();
killAt.countDown();
if (numberOfMessages++ % 10 == 0) {
logger.info("Sent {}", numberOfMessages);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
logger.warn(e.getMessage(), e);
try {
connection.close();
} catch (Throwable ignored) {
}
for (int retryNumber = 0; retryNumber < 100; retryNumber++) {
try {
Connection ctest = factory.createConnection();
ctest.close();
break;
} catch (Throwable retry) {
Thread.sleep(100);
}
}
connection = factory.createConnection();
session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createQueue(queueName));
connection.start();
}
}
} catch (Exception e) {
logger.warn("Error getting the initial connection", e);
errors.incrementAndGet();
}
logger.info("Done sending");
done.countDown();
});
}
Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
serverProcess.destroyForcibly();
serverProcess = startServer(SERVER_NAME_0, 0, 0);
QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder, queueName, queueName, RoutingType.ANYCAST, 5000);
Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());
long numberOfMessages = queueControl.getMessageCount();
logger.info("there are {} messages", numberOfMessages);
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
connection.start();
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
}
}
File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages");
Assert.assertTrue(lmFolder.exists());
Wait.assertEquals(0, () -> lmFolder.listFiles().length);
}
}