diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index aaecba0702..476f0776f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -447,7 +447,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
public long storePendingLargeMessage(final long messageID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
long recordID = generateID();
- messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true));
+ if (logger.isTraceEnabled()) {
+ logger.trace("Storing pending large message for messageID={} on recordID={}", messageID, recordID);
+ }
+ // the pending record has to be stored and synced before the large message file is created
+ messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, null);
+ if (logger.isTraceEnabled()) {
+ logger.trace("...Stored pending large message for messageID={} on recordID={}", messageID, recordID);
+ }
return recordID;
}
@@ -569,10 +576,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
}
- // We do this here to avoid a case where the replication gets a list without this file
- // to avoid a race
- largeMessage.validateFile();
-
if (largeMessage.toMessage().isDurable()) {
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);
@@ -580,6 +583,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
largeMessage.setPendingRecordID(pendingRecordID);
}
+ // the file has to be created after te record is stored
+ largeMessage.validateFile();
+
+
return largeMessage;
}
diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml
index 4b3e65f42f..b45ea34b33 100644
--- a/tests/soak-tests/pom.xml
+++ b/tests/soak-tests/pom.xml
@@ -247,6 +247,31 @@
+
+
+ test-compile
+ create-lminterrupt
+
+ create
+
+
+ amq
+ artemis
+ artemis
+ true
+ false
+ ${basedir}/target/lminterrupt
+ ${basedir}/target/classes/servers/lminterrupt
+
+ --java-options
+ -Djava.rmi.server.hostname=localhost
+ --queues
+ LargeMessageInterruptTest
+ --name
+ lminterrupt
+
+
+
diff --git a/tests/soak-tests/src/main/resources/servers/lminterrupt/management.xml b/tests/soak-tests/src/main/resources/servers/lminterrupt/management.xml
new file mode 100644
index 0000000000..1d38e28ac9
--- /dev/null
+++ b/tests/soak-tests/src/main/resources/servers/lminterrupt/management.xml
@@ -0,0 +1,52 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
index 40b54ef2cd..02dd357eff 100644
--- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.soak;
import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
@@ -28,8 +29,11 @@ import java.net.MalformedURLException;
import java.util.HashSet;
import java.util.Set;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.cli.commands.Stop;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -152,6 +156,34 @@ public class SoakTestBase extends ActiveMQTestBase {
throw lastException;
}
+ protected static QueueControl getQueueControl(String uri,
+ ObjectNameBuilder builder,
+ String address,
+ String queueName,
+ RoutingType routingType,
+ long timeout) throws Throwable {
+ long expireLoop = System.currentTimeMillis() + timeout;
+ Throwable lastException = null;
+ do {
+ try {
+ JMXConnector connector = newJMXFactory(uri);
+
+ ObjectName objectQueueName = builder.getQueueObjectName(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType);
+
+ QueueControl queueControl = MBeanServerInvocationHandler.newProxyInstance(connector.getMBeanServerConnection(), objectQueueName, QueueControl.class, false);
+ queueControl.getMessagesAcknowledged(); // making one call
+ return queueControl;
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ lastException = e;
+ Thread.sleep(500);
+ }
+ }
+ while (expireLoop > System.currentTimeMillis());
+
+ throw lastException;
+ }
+
protected static JMXConnector getJmxConnector(String hostname, int port) throws MalformedURLException {
// Without this, the RMI server would bind to the default interface IP (the user's local IP mostly)
System.setProperty("java.rmi.server.hostname", hostname);
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
new file mode 100644
index 0000000000..e784394933
--- /dev/null
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// This is used to kill a server and make sure the server will remove any pending files.
+public class LargeMessageInterruptTest extends SoakTestBase {
+
+ public static final String SERVER_NAME_0 = "lminterrupt";
+ private static final String JMX_SERVER_HOSTNAME = "localhost";
+ private static final int JMX_SERVER_PORT_0 = 1099;
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
+ static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "lminterrupt", true);
+ Process serverProcess;
+
+ public ConnectionFactory createConnectionFactory(String protocol) {
+ return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+ }
+
+ @Before
+ public void before() throws Exception {
+ cleanupData(SERVER_NAME_0);
+ serverProcess = startServer(SERVER_NAME_0, 0, 30000);
+ disableCheckThread();
+ }
+
+ @Test
+ public void testInterruptLargeMessageAMQPTX() throws Throwable {
+ testInterruptLM("AMQP", true);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORETX() throws Throwable {
+ testInterruptLM("CORE", true);
+ }
+
+ @Test
+ public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
+ testInterruptLM("AMQP", false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORENonTX() throws Throwable {
+ testInterruptLM("CORE", false);
+ }
+
+ private void testInterruptLM(String protocol, boolean tx) throws Throwable {
+ final int BODY_SIZE = 500 * 1024;
+ final int NUMBER_OF_MESSAGES = 10; // this is per producer
+ final int SENDING_THREADS = 10;
+ CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
+ final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
+ final AtomicInteger produced = new AtomicInteger(0);
+ final ConnectionFactory factory = createConnectionFactory(protocol);
+ final AtomicInteger errors = new AtomicInteger(0); // I don't expect many errors since this test is disconnecting and reconnecting the server
+ final CountDownLatch killAt = new CountDownLatch(40);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(SENDING_THREADS);
+ runAfter(executorService::shutdownNow);
+
+ String queueName = "LargeMessageInterruptTest";
+
+ String body;
+
+ {
+ StringBuffer buffer = new StringBuffer();
+ while (buffer.length() < BODY_SIZE) {
+ buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T REALLY CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I DON'T REALLY CARE ");
+ }
+ body = buffer.toString();
+ }
+
+ for (int i = 0; i < SENDING_THREADS; i++) {
+ executorService.execute(() -> {
+ int numberOfMessages = 0;
+ try {
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(session.createQueue(queueName));
+
+ startFlag.await(10, TimeUnit.SECONDS);
+ while (numberOfMessages < NUMBER_OF_MESSAGES) {
+ try {
+ producer.send(session.createTextMessage(body));
+ if (tx) {
+ session.commit();
+ }
+ produced.incrementAndGet();
+ killAt.countDown();
+ if (numberOfMessages++ % 10 == 0) {
+ logger.info("Sent {}", numberOfMessages);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+
+ logger.warn(e.getMessage(), e);
+ try {
+ connection.close();
+ } catch (Throwable ignored) {
+ }
+
+ for (int retryNumber = 0; retryNumber < 100; retryNumber++) {
+ try {
+ Connection ctest = factory.createConnection();
+ ctest.close();
+ break;
+ } catch (Throwable retry) {
+ Thread.sleep(100);
+ }
+ }
+
+ connection = factory.createConnection();
+ session = connection.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(session.createQueue(queueName));
+ connection.start();
+
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Error getting the initial connection", e);
+ errors.incrementAndGet();
+ }
+
+ logger.info("Done sending");
+ done.countDown();
+ });
+ }
+
+ Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
+ serverProcess.destroyForcibly();
+ serverProcess = startServer(SERVER_NAME_0, 0, 0);
+ QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder, queueName, queueName, RoutingType.ANYCAST, 5000);
+
+ Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ long numberOfMessages = queueControl.getMessageCount();
+ logger.info("there are {} messages", numberOfMessages);
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
+ connection.start();
+ for (int i = 0; i < numberOfMessages; i++) {
+ Message message = consumer.receive(5000);
+ Assert.assertNotNull(message);
+ }
+ }
+
+ File lmFolder = new File(getServerLocation(SERVER_NAME_0) + "/data/large-messages");
+ Assert.assertTrue(lmFolder.exists());
+ Wait.assertEquals(0, () -> lmFolder.listFiles().length);
+
+ }
+
+}
\ No newline at end of file