This commit is contained in:
Clebert Suconic 2021-08-19 10:18:22 -04:00
commit cf312d5b7d
2 changed files with 61 additions and 1 deletions

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
@ -45,7 +46,7 @@ public class MQTTRetainMessageManager {
* the subscription queue for the consumer. When a new retained message is received the message will be sent to * the subscription queue for the consumer. When a new retained message is received the message will be sent to
* the retained queue and the previous retain message consumed to remove it from the queue. * the retained queue and the previous retain message consumed to remove it from the queue.
*/ */
void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception { void handleRetainedMessage(Message messageParameter, String address, boolean reset, Transaction tx) throws Exception {
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration())); SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
Queue queue = session.getServer().locateQueue(retainAddress); Queue queue = session.getServer().locateQueue(retainAddress);
@ -56,6 +57,7 @@ public class MQTTRetainMessageManager {
queue.deleteAllReferences(); queue.deleteAllReferences();
if (!reset) { if (!reset) {
Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, session.getServer().getStorageManager());
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx); sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
} }

View File

@ -40,6 +40,7 @@ import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
@ -49,6 +50,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -430,6 +432,62 @@ public class MQTTTest extends MQTTTestSupport {
publisher.disconnect(); publisher.disconnect();
} }
@Test(timeout = 60 * 1000)
public void testSendAndReceiveRetainedLargeMessage() throws Exception {
AssertionLoggerHandler.startCapture();
try {
byte[] payload = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2];
for (int i = 0; i < payload.length; i++) {
payload[i] = '2';
}
String body = "message";
String smallRetain = "retain";
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
final MQTTClientProvider subscriber = getMQTTClientProvider();
initializeConnection(subscriber);
publisher.publish("foo", payload, AT_LEAST_ONCE, true);
subscriber.subscribe("foo", AT_LEAST_ONCE);
publisher.publish("foo", body.getBytes(), AT_LEAST_ONCE, false);
byte[] msg = subscriber.receive(5000);
assertNotNull(msg);
assertEquals(msg.length, payload.length);
msg = subscriber.receive(5000);
assertNotNull(msg);
assertEquals(msg.length, body.length());
subscriber.disconnect();
final MQTTClientProvider subscriber2 = getMQTTClientProvider();
initializeConnection(subscriber2);
subscriber2.subscribe("foo", AT_LEAST_ONCE);
msg = subscriber2.receive(5000);
assertNotNull(msg);
assertEquals(msg.length, payload.length);
subscriber2.disconnect();
publisher.publish("foo", smallRetain.getBytes(), AT_LEAST_ONCE, true);
final MQTTClientProvider subscriber3 = getMQTTClientProvider();
initializeConnection(subscriber3);
subscriber3.subscribe("foo", AT_LEAST_ONCE);
msg = subscriber3.receive(5000);
assertNotNull(msg);
assertEquals(msg.length, smallRetain.getBytes().length);
subscriber3.disconnect();
publisher.disconnect();
Assert.assertFalse(AssertionLoggerHandler.findText("Exception"));
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@Test(timeout = 30 * 1000) @Test(timeout = 30 * 1000)
public void testValidZeroLengthClientId() throws Exception { public void testValidZeroLengthClientId() throws Exception {
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();