ARTEMIS-3426 - fix copying of large retained message
https://issues.apache.org/jira/browse/ARTEMIS-3426
This commit is contained in:
parent
52f429afa1
commit
364d4cc324
|
@ -20,6 +20,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
|
||||||
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
@ -45,7 +46,7 @@ public class MQTTRetainMessageManager {
|
||||||
* the subscription queue for the consumer. When a new retained message is received the message will be sent to
|
* the subscription queue for the consumer. When a new retained message is received the message will be sent to
|
||||||
* the retained queue and the previous retain message consumed to remove it from the queue.
|
* the retained queue and the previous retain message consumed to remove it from the queue.
|
||||||
*/
|
*/
|
||||||
void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception {
|
void handleRetainedMessage(Message messageParameter, String address, boolean reset, Transaction tx) throws Exception {
|
||||||
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
|
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration()));
|
||||||
|
|
||||||
Queue queue = session.getServer().locateQueue(retainAddress);
|
Queue queue = session.getServer().locateQueue(retainAddress);
|
||||||
|
@ -56,6 +57,7 @@ public class MQTTRetainMessageManager {
|
||||||
queue.deleteAllReferences();
|
queue.deleteAllReferences();
|
||||||
|
|
||||||
if (!reset) {
|
if (!reset) {
|
||||||
|
Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, session.getServer().getStorageManager());
|
||||||
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
|
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ import java.util.regex.Pattern;
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
|
@ -49,6 +50,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
import org.apache.activemq.artemis.tests.util.Wait;
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
@ -430,6 +432,62 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
publisher.disconnect();
|
publisher.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60 * 1000)
|
||||||
|
public void testSendAndReceiveRetainedLargeMessage() throws Exception {
|
||||||
|
AssertionLoggerHandler.startCapture();
|
||||||
|
try {
|
||||||
|
byte[] payload = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2];
|
||||||
|
for (int i = 0; i < payload.length; i++) {
|
||||||
|
payload[i] = '2';
|
||||||
|
}
|
||||||
|
String body = "message";
|
||||||
|
|
||||||
|
String smallRetain = "retain";
|
||||||
|
final MQTTClientProvider publisher = getMQTTClientProvider();
|
||||||
|
initializeConnection(publisher);
|
||||||
|
|
||||||
|
final MQTTClientProvider subscriber = getMQTTClientProvider();
|
||||||
|
initializeConnection(subscriber);
|
||||||
|
|
||||||
|
publisher.publish("foo", payload, AT_LEAST_ONCE, true);
|
||||||
|
|
||||||
|
subscriber.subscribe("foo", AT_LEAST_ONCE);
|
||||||
|
|
||||||
|
publisher.publish("foo", body.getBytes(), AT_LEAST_ONCE, false);
|
||||||
|
byte[] msg = subscriber.receive(5000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals(msg.length, payload.length);
|
||||||
|
|
||||||
|
msg = subscriber.receive(5000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals(msg.length, body.length());
|
||||||
|
|
||||||
|
subscriber.disconnect();
|
||||||
|
|
||||||
|
final MQTTClientProvider subscriber2 = getMQTTClientProvider();
|
||||||
|
initializeConnection(subscriber2);
|
||||||
|
subscriber2.subscribe("foo", AT_LEAST_ONCE);
|
||||||
|
msg = subscriber2.receive(5000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals(msg.length, payload.length);
|
||||||
|
subscriber2.disconnect();
|
||||||
|
publisher.publish("foo", smallRetain.getBytes(), AT_LEAST_ONCE, true);
|
||||||
|
final MQTTClientProvider subscriber3 = getMQTTClientProvider();
|
||||||
|
initializeConnection(subscriber3);
|
||||||
|
subscriber3.subscribe("foo", AT_LEAST_ONCE);
|
||||||
|
msg = subscriber3.receive(5000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals(msg.length, smallRetain.getBytes().length);
|
||||||
|
subscriber3.disconnect();
|
||||||
|
publisher.disconnect();
|
||||||
|
|
||||||
|
Assert.assertFalse(AssertionLoggerHandler.findText("Exception"));
|
||||||
|
} finally {
|
||||||
|
AssertionLoggerHandler.stopCapture();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 30 * 1000)
|
@Test(timeout = 30 * 1000)
|
||||||
public void testValidZeroLengthClientId() throws Exception {
|
public void testValidZeroLengthClientId() throws Exception {
|
||||||
MQTT mqtt = createMQTTConnection();
|
MQTT mqtt = createMQTTConnection();
|
||||||
|
|
Loading…
Reference in New Issue