ARTEMIS-4646 Reuse only acknowledged MQTT IDs

Generate MQTT message IDs from full allowed range of 1-65535 and skip
currently used values. Do not use atomic integer for current ID, because
all accesses and modifications are performed in synchronized context.
This commit is contained in:
Albertas Vyšniauskas 2024-02-20 22:47:15 +02:00 committed by Justin Bertram
parent 4c0ed67897
commit f4b59c9b25
2 changed files with 69 additions and 9 deletions

View File

@ -25,7 +25,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import io.netty.buffer.ByteBuf;
@ -444,14 +443,13 @@ public class MQTTSessionState {
}
public class OutboundStore {
private HashMap<Pair<Long, Long>, Integer> artemisToMqttMessageMap = new HashMap<>();
private HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
private final Object dataStoreLock = new Object();
private final AtomicInteger ids = new AtomicInteger(0);
private int currentId = 0;
private Pair<Long, Long> generateKey(long messageId, long consumerID) {
return new Pair<>(messageId, consumerID);
@ -461,8 +459,14 @@ public class MQTTSessionState {
synchronized (dataStoreLock) {
Integer id = artemisToMqttMessageMap.get(generateKey(messageId, consumerId));
if (id == null) {
ids.compareAndSet(Short.MAX_VALUE, 1);
id = ids.addAndGet(1);
do {
if (currentId == MQTTUtil.TWO_BYTE_INT_MAX) {
currentId = 0;
}
++currentId;
}
while (mqttToServerIds.containsKey(currentId));
id = currentId;
}
return id;
}
@ -478,9 +482,8 @@ public class MQTTSessionState {
public Pair<Long, Long> publishAckd(int mqtt) {
synchronized (dataStoreLock) {
Pair p = mqttToServerIds.remove(mqtt);
Pair<Long, Long> p = mqttToServerIds.remove(mqtt);
if (p != null) {
mqttToServerIds.remove(p.getA());
artemisToMqttMessageMap.remove(p);
}
return p;
@ -511,7 +514,7 @@ public class MQTTSessionState {
synchronized (dataStoreLock) {
artemisToMqttMessageMap.clear();
mqttToServerIds.clear();
ids.set(0);
currentId = 0;
}
}
}

View File

@ -423,7 +423,7 @@ public class MQTTTest extends MQTTTestSupport {
@Ignore
@Test(timeout = 600 * 1000)
public void testSendMoreThanUniqueId() throws Exception {
int messages = (Short.MAX_VALUE * 2) + 1;
int messages = MQTTUtil.TWO_BYTE_INT_MAX;
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
@ -447,6 +447,63 @@ public class MQTTTest extends MQTTTestSupport {
publisher.disconnect();
}
@Test(timeout = 600 * 1000)
public void testNoMessageIdReuseBeforeAcknowledgment() throws Exception {
final int messages = MQTTUtil.TWO_BYTE_INT_MAX;
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
final MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
final short[] messageId = new short[1];
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
if (frame.messageType() == PUBLISH.TYPE) {
try {
PUBLISH publish = new PUBLISH();
publish.decode(frame);
messageId[0] = publish.messageId();
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
}
}
});
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(new Topic[]{new Topic("foo", QoS.EXACTLY_ONCE)});
publisher.publish("foo", "First Message".getBytes(), EXACTLY_ONCE);
final Message firstMessage = connection.receive(5000, TimeUnit.MILLISECONDS);
final short firstMessageId = messageId[0];
publisher.publish("foo", "Second Message".getBytes(), EXACTLY_ONCE);
final Message secondMessage = connection.receive(5000, TimeUnit.MILLISECONDS);
final short secondMessageId = messageId[0];
int count = 0;
for (int i = 0; i < messages; i++) {
String payload = "Test Message: " + i;
publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
Message message = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("Should get a message + [" + i + "]", message);
assertEquals(payload, new String(message.getPayload()));
assertFalse("Message ID must not be reused until previous message acknowledgment", firstMessageId == messageId[0] || secondMessageId == messageId[0]);
message.ack();
count++;
}
firstMessage.ack();
secondMessage.ack();
assertEquals(messages, count);
connection.unsubscribe(new String[]{"foo"});
connection.disconnect();
publisher.disconnect();
}
@Test(timeout = 60 * 1000)
public void testSendAndReceiveLargeMessages() throws Exception {
byte[] payload = new byte[1024 * 32];