diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index 147c30ba3a..a704073f6b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -20,6 +20,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; @@ -45,7 +46,7 @@ public class MQTTRetainMessageManager { * the subscription queue for the consumer. When a new retained message is received the message will be sent to * the retained queue and the previous retain message consumed to remove it from the queue. */ - void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception { + void handleRetainedMessage(Message messageParameter, String address, boolean reset, Transaction tx) throws Exception { SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration())); Queue queue = session.getServer().locateQueue(retainAddress); @@ -56,6 +57,7 @@ public class MQTTRetainMessageManager { queue.deleteAllReferences(); if (!reset) { + Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, session.getServer().getStorageManager()); sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index dea99ce44c..02eea3335f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -40,6 +40,7 @@ import java.util.regex.Pattern; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -49,6 +50,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -430,6 +432,62 @@ public class MQTTTest extends MQTTTestSupport { publisher.disconnect(); } + @Test(timeout = 60 * 1000) + public void testSendAndReceiveRetainedLargeMessage() throws Exception { + AssertionLoggerHandler.startCapture(); + try { + byte[] payload = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2]; + for (int i = 0; i < payload.length; i++) { + payload[i] = '2'; + } + String body = "message"; + + String smallRetain = "retain"; + final MQTTClientProvider publisher = getMQTTClientProvider(); + initializeConnection(publisher); + + final MQTTClientProvider subscriber = getMQTTClientProvider(); + initializeConnection(subscriber); + + publisher.publish("foo", payload, AT_LEAST_ONCE, true); + + subscriber.subscribe("foo", AT_LEAST_ONCE); + + publisher.publish("foo", body.getBytes(), AT_LEAST_ONCE, false); + byte[] msg = subscriber.receive(5000); + assertNotNull(msg); + assertEquals(msg.length, payload.length); + + msg = subscriber.receive(5000); + assertNotNull(msg); + assertEquals(msg.length, body.length()); + + subscriber.disconnect(); + + final MQTTClientProvider subscriber2 = getMQTTClientProvider(); + initializeConnection(subscriber2); + subscriber2.subscribe("foo", AT_LEAST_ONCE); + msg = subscriber2.receive(5000); + assertNotNull(msg); + assertEquals(msg.length, payload.length); + subscriber2.disconnect(); + publisher.publish("foo", smallRetain.getBytes(), AT_LEAST_ONCE, true); + final MQTTClientProvider subscriber3 = getMQTTClientProvider(); + initializeConnection(subscriber3); + subscriber3.subscribe("foo", AT_LEAST_ONCE); + msg = subscriber3.receive(5000); + assertNotNull(msg); + assertEquals(msg.length, smallRetain.getBytes().length); + subscriber3.disconnect(); + publisher.disconnect(); + + Assert.assertFalse(AssertionLoggerHandler.findText("Exception")); + } finally { + AssertionLoggerHandler.stopCapture(); + } + + } + @Test(timeout = 30 * 1000) public void testValidZeroLengthClientId() throws Exception { MQTT mqtt = createMQTTConnection();