ARTEMIS-917 Only return body of retained message after reboot

This commit is contained in:
Martyn Taylor 2017-02-05 19:20:32 +00:00 committed by Justin Bertram
parent 36aef22d02
commit 3900cb0ec7
3 changed files with 37 additions and 1 deletions

View File

@ -162,8 +162,10 @@ public abstract class MessageImpl implements MessageInternal {
buffer.setIndex(other.buffer.readerIndex(), buffer.capacity());
bodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, this);
bodyBuffer.readerIndex(BODY_OFFSET);
bodyBuffer.writerIndex(other.getBodyBuffer().writerIndex());
endOfBodyPosition = other.endOfBodyPosition;
}
}
}

View File

@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
@ -227,7 +228,8 @@ public class MQTTPublishManager {
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
}
default:
payload = message.getBodyBufferDuplicate().byteBuf();
ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate();
payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
break;
}
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);

View File

@ -1746,4 +1746,36 @@ public class MQTTTest extends MQTTTestSupport {
assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
}
@Test
public void testRetainedMessagesAreCorrectlyFormedAfterRestart() throws Exception {
String clientId = "testMqtt";
String address = "testAddress";
String payload = "This is a test message";
// Create address
getServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.MULTICAST));
// Send MQTT Retain Message
Topic[] mqttTopic = new Topic[]{new Topic(address, QoS.AT_LEAST_ONCE)};
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(clientId);
BlockingConnection connection1 = mqtt.blockingConnection();
connection1.connect();
connection1.publish(address, payload.getBytes(), QoS.AT_LEAST_ONCE, true);
getServer().stop(false);
getServer().start();
waitForServerToStart(getServer());
MQTT mqtt2 = createMQTTConnection();
mqtt2.setClientId(clientId + "2");
BlockingConnection connection2 = mqtt2.blockingConnection();
connection2.connect();
connection2.subscribe(mqttTopic);
Message message = connection2.receive();
assertEquals(payload, new String(message.getPayload()));
}
}