ARTEMIS-4155 Fixing deadlock on LargeMessage conversion and retention

This commit is contained in:
Clebert Suconic 2023-02-03 12:03:30 -05:00 committed by clebertsuconic
parent 6e4af114f1
commit ed5f63538e
6 changed files with 536 additions and 2 deletions

View File

@ -878,7 +878,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
private void historyBody(long messageId, EncodingSupport partialBuffer) {
try {
messageJournal.appendAddEvent(messageId, JournalRecordIds.ADD_MESSAGE_BODY, EncoderPersister.getInstance(), partialBuffer, true, null);
messageJournal.appendAddEvent(messageId, JournalRecordIds.ADD_MESSAGE_BODY, EncoderPersister.getInstance(), partialBuffer, false, null);
} catch (Exception e) {
logger.warn("Error processing history large message body for {}", messageId, e);
}

View File

@ -70,7 +70,7 @@ public class ReplayManager {
}
public void replay(Date start, Date end, String sourceAddress, String targetAddressParameter, String filterStr) throws Exception {
logger.debug("Replay::{}", sourceAddress);
logger.debug("Replay start::sourceAddress={}", sourceAddress);
if (sourceAddress == null) {
throw new NullPointerException("sourceAddress");
@ -129,6 +129,7 @@ public class ReplayManager {
continue;
}
}
logger.debug("Reading retention file {}", file);
JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() {
@Override
public void onReadEventRecord(RecordInfo info) throws Exception {
@ -179,6 +180,8 @@ public class ReplayManager {
}, null, false, null);
}
logger.debug("Replay done::sourceAddress={}", sourceAddress);
}
private boolean messageMatch(Filter filter, Message message, String sourceAddress, String targetAddress) {

View File

@ -219,6 +219,34 @@
</args>
</configuration>
</execution>
<!-- Used on TestRetention -->
<execution>
<phase>test-compile</phase>
<id>create-lmreplay</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<role>amq</role>
<user>artemis</user>
<password>artemis</password>
<allowAnonymous>true</allowAnonymous>
<noWeb>false</noWeb>
<instance>${basedir}/target/replay/large-message</instance>
<configuration>${basedir}/target/classes/servers/replay/large-message</configuration>
<args>
<!-- this is needed to run the server remotely -->
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost</arg>
<arg>--journal-retention</arg>
<arg>1</arg>
<arg>--queues</arg>
<arg>RetentionTest</arg>
<arg>--name</arg>
<arg>large-message</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>

View File

@ -0,0 +1,246 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>replay</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-retention-directory period="1" unit="DAYS">./data/retention</journal-retention-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<!-- using a small journal-buffer-size to allow the message to be converted before journal persistence -->
<journal-buffer-size>20k</journal-buffer-size>
<journal-file-size>500K</journal-file-size>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size> -->
<!-- the maximum number of messages accepted before entering full address mode.
if global-max-size is specified the full address mode will be specified by whatever hits it first. -->
<global-max-messages>-1</global-max-messages>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
default: 102400, -1 would mean to disable large mesasge control -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- disabling paging -->
<max-size-messages>-1</max-size-messages>
<!-- the size of each file on paging. Notice we keep files in memory while they are in use.
Lower this setting if you have too many queues in memory. -->
<page-size-bytes>10M</page-size-bytes>
<!-- limit how many messages are read from paging into the Queue. -->
<max-read-page-messages>-1</max-read-page-messages>
<!-- limit how much memory is read from paging into the Queue. -->
<max-read-page-bytes>20M</max-read-page-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
<address name="RetentionTest">
<anycast>
<queue name="RetentionTest" />
</anycast>
</address>
</addresses>
<!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="true"/>
<property key="LOG_CONNECTION_EVENTS" value="true"/>
<property key="LOG_SESSION_EVENTS" value="true"/>
<property key="LOG_CONSUMER_EVENTS" value="true"/>
<property key="LOG_DELIVERING_EVENTS" value="true"/>
<property key="LOG_SENDING_EVENTS" value="true"/>
<property key="LOG_INTERNAL_EVENTS" value="true"/>
</broker-plugin>
</broker-plugins>
-->
</core>
</configuration>

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<management-context xmlns="http://activemq.apache.org/schema">
<connector connector-port="1099"/>
<authorisation>
<allowlist>
<entry domain="hawtio"/>
</allowlist>
<default-access>
<access method="list*" roles="amq"/>
<access method="get*" roles="amq"/>
<access method="is*" roles="amq"/>
<access method="set*" roles="amq"/>
<access method="*" roles="amq"/>
</default-access>
<role-access>
<match domain="org.apache.activemq.artemis">
<access method="list*" roles="amq"/>
<access method="get*" roles="amq"/>
<access method="is*" roles="amq"/>
<access method="set*" roles="amq"/>
<!-- Note count and browse are need to access the browse tab in the console-->
<access method="browse*" roles="amq"/>
<access method="count*" roles="amq"/>
<access method="*" roles="amq"/>
</match>
<!--example of how to configure a specific object-->
<!--<match domain="org.apache.activemq.artemis" key="subcomponent=queues">
<access method="list*" roles="view,update,amq"/>
<access method="get*" roles="view,update,amq"/>
<access method="is*" roles="view,update,amq"/>
<access method="set*" roles="update,amq"/>
<access method="*" roles="amq"/>
</match>-->
</role-access>
</authorisation>
</management-context>

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.retention;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// The server used by this test has the journal retention configured.
// The server should not enter into a deadlock state just because retention is being used.
// The focus of this test is to make sure all messages are sent and received normally
public class LargeMessageRetentionTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_SERVER_PORT_0 = 1099;
static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "replay", true);
public static final String SERVER_NAME_0 = "replay/large-message";
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
startServer(SERVER_NAME_0, 0, 30000);
disableCheckThread();
}
@Test
public void testRetentionOpenWire() throws Throwable {
testRetention("OPENWIRE", 100, 10, 200 * 1024, 10);
}
@Test
public void testRetentionAMQP() throws Throwable {
testRetention("AMQP", 100, 10, 50 * 1024, 10);
}
@Test
public void testRetentionAMQPRealLarge() throws Throwable {
testRetention("AMQP", 100, 10, 300 * 1024, 10);
}
// in this case messages are not really > min-large-message-size, but they will be converted because of the journal small buffer size
@Test
public void testRetentionCore() throws Throwable {
testRetention("CORE", 100, 10, 50 * 1024, 10);
}
// in this case the messages are actually large
@Test
public void testRetentionCoreRealLarge() throws Throwable {
testRetention("CORE", 100, 10, 300 * 1024, 10);
}
private void testRetention(String protocol, int NUMBER_OF_MESSAGES, int backlog, int bodySize, int producers) throws Throwable {
Assert.assertTrue(NUMBER_OF_MESSAGES % producers == 0); // checking that it is a multiple
ActiveMQServerControl serverControl = getServerControl(liveURI, liveNameBuilder, 5000);
final Semaphore consumerCredits = new Semaphore(-backlog);
final String queueName = "RetentionTest";
final AtomicInteger errors = new AtomicInteger(0);
final CountDownLatch latchReceiver = new CountDownLatch(1);
final CountDownLatch latchSender = new CountDownLatch(producers);
String bufferStr;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < bodySize; i++) {
buffer.append("*");
}
bufferStr = RandomUtil.randomString() + buffer;
}
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
ExecutorService executor = Executors.newFixedThreadPool(1 * producers);
runAfter(executor::shutdownNow);
executor.execute(() -> {
try (Connection consumerConnection = factory.createConnection()) {
HashMap<Integer, AtomicInteger> messageSequences = new HashMap<>();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = consumerSession.createQueue(queueName);
consumerConnection.start();
MessageConsumer consumer = consumerSession.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
logger.debug("Acquiring semop at {}", i);
Assert.assertTrue(consumerCredits.tryAcquire(1, TimeUnit.MINUTES));
TextMessage message = (TextMessage) consumer.receive(60_000);
Assert.assertNotNull(message);
int producerI = message.getIntProperty("producerI");
AtomicInteger messageSequence = messageSequences.get(producerI);
if (messageSequence == null) {
messageSequence = new AtomicInteger(0);
messageSequences.put(producerI, messageSequence);
}
Assert.assertEquals(messageSequence.getAndIncrement(), message.getIntProperty("messageI"));
logger.info("Received message {}", i);
Assert.assertEquals(bufferStr, message.getText());
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
latchReceiver.countDown();
}
});
for (int producerID = 0; producerID < producers; producerID++) {
int theProducerID = producerID; // to be used within the executor's inner method
executor.execute(() -> {
try (Connection producerConnection = factory.createConnection()) {
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = producerSession.createQueue(queueName);
MessageProducer producer = producerSession.createProducer(queue);
for (int messageI = 0; messageI < NUMBER_OF_MESSAGES / producers; messageI++) {
logger.info("Sending message {} from producerID", messageI, theProducerID);
Message message = producerSession.createTextMessage(bufferStr);
message.setIntProperty("messageI", messageI);
message.setIntProperty("producerI", theProducerID);
producer.send(message);
consumerCredits.release();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
latchSender.countDown();
}
});
}
Assert.assertTrue(latchSender.await(10, TimeUnit.MINUTES));
consumerCredits.release(backlog);
Assert.assertTrue(latchReceiver.await(10, TimeUnit.MINUTES));
Assert.assertEquals(0, errors.get());
try (Connection consumerConnection = factory.createConnection()) {
HashMap<Integer, AtomicInteger> messageSequences = new HashMap<>();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = consumerSession.createQueue(queueName);
consumerConnection.start();
MessageConsumer consumer = consumerSession.createConsumer(queue);
Assert.assertNull(consumer.receiveNoWait());
serverControl.replay(queueName, queueName, "producerI=0 AND messageI>=0 AND messageI<10");
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(300_000);
Assert.assertNotNull(message);
logger.info("Received replay message {}", i);
Assert.assertEquals(0, message.getIntProperty("producerI"));
Assert.assertEquals(i, message.getIntProperty("messageI"));
Assert.assertEquals(bufferStr, message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
}
}
}