mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-13 13:35:47 +00:00
ARTEMIS-990 Dont require Perms on MQTT mngment Q
(cherry picked from commit b33fea0d7fbc94a43d04ca66a89880442e0f91c5)
This commit is contained in:
parent
e0cd9aa8b3
commit
2779ad8553
@ -62,10 +62,6 @@ public class MQTTPublishManager {
|
|||||||
synchronized void start() throws Exception {
|
synchronized void start() throws Exception {
|
||||||
this.state = session.getSessionState();
|
this.state = session.getSessionState();
|
||||||
this.outboundStore = state.getOutboundStore();
|
this.outboundStore = state.getOutboundStore();
|
||||||
|
|
||||||
createManagementAddress();
|
|
||||||
createManagementQueue();
|
|
||||||
createManagementConsumer();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void stop() throws Exception {
|
synchronized void stop() throws Exception {
|
||||||
@ -77,7 +73,7 @@ public class MQTTPublishManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void clean() throws Exception {
|
void clean() throws Exception {
|
||||||
createManagementAddress();
|
SimpleString managementAddress = createManagementAddress();
|
||||||
Queue queue = session.getServer().locateQueue(managementAddress);
|
Queue queue = session.getServer().locateQueue(managementAddress);
|
||||||
if (queue != null) {
|
if (queue != null) {
|
||||||
queue.deleteQueue();
|
queue.deleteQueue();
|
||||||
@ -90,14 +86,14 @@ public class MQTTPublishManager {
|
|||||||
managementConsumer.setStarted(true);
|
managementConsumer.setStarted(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createManagementAddress() {
|
private SimpleString createManagementAddress() {
|
||||||
managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
|
return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createManagementQueue() throws Exception {
|
private void createManagementQueue() throws Exception {
|
||||||
Queue q = session.getServer().locateQueue(managementAddress);
|
Queue q = session.getServer().locateQueue(managementAddress);
|
||||||
if (q == null) {
|
if (q == null) {
|
||||||
session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES);
|
session.getServer().createQueue(managementAddress, managementAddress, null, MQTTUtil.DURABLE_MESSAGES, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,11 +179,20 @@ public class MQTTPublishManager {
|
|||||||
session.getProtocolHandler().sendPubRel(messageId);
|
session.getProtocolHandler().sendPubRel(messageId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private SimpleString getManagementAddress() throws Exception {
|
||||||
|
if (managementAddress == null) {
|
||||||
|
managementAddress = createManagementAddress();
|
||||||
|
createManagementQueue();
|
||||||
|
createManagementConsumer();
|
||||||
|
}
|
||||||
|
return managementAddress;
|
||||||
|
}
|
||||||
|
|
||||||
void handlePubRec(int messageId) throws Exception {
|
void handlePubRec(int messageId) throws Exception {
|
||||||
try {
|
try {
|
||||||
Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
|
Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
|
||||||
if (ref != null) {
|
if (ref != null) {
|
||||||
ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
|
ServerMessage m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId);
|
||||||
session.getServerSession().send(m, true);
|
session.getServerSession().send(m, true);
|
||||||
session.getServerSession().acknowledge(ref.getB(), ref.getA());
|
session.getServerSession().acknowledge(ref.getB(), ref.getA());
|
||||||
} else {
|
} else {
|
||||||
|
@ -49,7 +49,7 @@ public class MQTTRetainMessageManager {
|
|||||||
|
|
||||||
Queue queue = session.getServer().locateQueue(retainAddress);
|
Queue queue = session.getServer().locateQueue(retainAddress);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
queue = session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true);
|
queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
|
try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
|
||||||
|
@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
|
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
|
||||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
|
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
|
||||||
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
|
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
|
||||||
|
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||||
import org.fusesource.mqtt.client.BlockingConnection;
|
import org.fusesource.mqtt.client.BlockingConnection;
|
||||||
import org.fusesource.mqtt.client.MQTT;
|
import org.fusesource.mqtt.client.MQTT;
|
||||||
import org.fusesource.mqtt.client.MQTTException;
|
import org.fusesource.mqtt.client.MQTTException;
|
||||||
@ -53,7 +54,6 @@ import org.junit.Ignore;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.vertx.java.core.impl.ConcurrentHashSet;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* QT
|
* QT
|
||||||
@ -1711,7 +1711,7 @@ public class MQTTTest extends MQTTTestSupport {
|
|||||||
connection2.connect();
|
connection2.connect();
|
||||||
connection2.subscribe(mqttTopic);
|
connection2.subscribe(mqttTopic);
|
||||||
|
|
||||||
Message message = connection2.receive();
|
Message message = connection2.receive(5000, TimeUnit.MILLISECONDS);
|
||||||
assertEquals(payload, new String(message.getPayload()));
|
assertEquals(payload, new String(message.getPayload()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user