ARTEMIS-4760 creating MQTT consumer should work if auto-create-queues is false

This commit is contained in:
Justin Bertram 2024-05-03 23:26:34 -05:00 committed by Robbie Gemmell
parent 4daefbf138
commit de0f6ac8f5
4 changed files with 111 additions and 8 deletions

View File

@ -104,8 +104,9 @@ public class MQTTSubscriptionManager {
private void addSubscription(MqttTopicSubscription subscription, Integer subscriptionIdentifier, boolean initialStart) throws Exception {
String rawTopicName = CompositeAddress.extractAddressName(subscription.topicName());
String parsedTopicName = MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
boolean isFullyQualified = CompositeAddress.isFullyQualified(subscription.topicName());
Queue q = createQueueForSubscription(rawTopicName, parsedTopicName);
Queue q = createQueueForSubscription(rawTopicName, parsedTopicName, isFullyQualified);
int qos = subscription.qualityOfService().value();
@ -146,20 +147,20 @@ public class MQTTSubscriptionManager {
}
}
private Queue createQueueForSubscription(String rawTopicName, String parsedTopicName) throws Exception {
private Queue createQueueForSubscription(String rawTopicName, String parsedTopicName, boolean isFullyQualified) throws Exception {
String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(parsedTopicName, session.getWildcardConfiguration());
String coreQueue = MQTTUtil.getCoreQueueFromMqttTopic(rawTopicName, session.getState().getClientId(), session.getWildcardConfiguration());
// check to see if a subscription queue already exists.
// check to see if a subscription queue already exists
Queue q = session.getServer().locateQueue(coreQueue);
// The queue does not exist so we need to create it.
// the subscription queue does not exist so we need to create it
if (q == null) {
SimpleString sAddress = SimpleString.toSimpleString(coreAddress);
// Check we can auto create queues.
// only check if we can auto create queues if it's FQQN
BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(sAddress);
if (!bindingQueryResult.isAutoCreateQueues()) {
if (isFullyQualified && !bindingQueryResult.isAutoCreateQueues()) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(sAddress);
}
@ -169,8 +170,7 @@ public class MQTTSubscriptionManager {
if (!bindingQueryResult.isAutoCreateAddresses()) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(sAddress);
}
addressInfo = session.getServerSession().createAddress(sAddress,
RoutingType.MULTICAST, true);
addressInfo = session.getServerSession().createAddress(sAddress, RoutingType.MULTICAST, true);
}
return findOrCreateQueue(bindingQueryResult, addressInfo, coreQueue);
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class JMSTopicSubscriberTest extends MultiprotocolJMSClientTestSupport {
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
protected void addConfiguration(ActiveMQServer server) throws Exception {
server.getAddressSettingsRepository().getMatch(getTopicName()).setAutoCreateQueues(false);
}
@Test
@Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)
public void testCoreSubscriptionQueueCreatedWhenAutoCreateDisabled() throws Exception {
Connection connection = createCoreConnection();
testSubscriptionQueueCreatedWhenAutoCreateDisabled(connection);
}
@Test
@Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)
public void testOpenWireSubscriptionQueueCreatedWhenAutoCreateDisabled() throws Exception {
Connection connection = createOpenWireConnection();
testSubscriptionQueueCreatedWhenAutoCreateDisabled(connection);
}
@Test
@Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)
public void testAMQPSubscriptionQueueCreatedWhenAutoCreateDisabled() throws Exception {
Connection connection = createConnection();
testSubscriptionQueueCreatedWhenAutoCreateDisabled(connection);
}
private void testSubscriptionQueueCreatedWhenAutoCreateDisabled(Connection connection) throws Exception {
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = session.createTopic(getTopicName());
assertEquals(0, server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(getTopicName())).size());
session.createConsumer(topic);
Wait.assertEquals(1, () -> server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(getTopicName())).size(), 2000, 100);
} finally {
connection.close();
}
}
}

View File

@ -698,4 +698,19 @@ public class MQTT5Test extends MQTT5TestSupport {
client.disconnect();
client.close();
}
@Test
@Timeout(value = DEFAULT_TIMEOUT, unit = TimeUnit.MILLISECONDS)
public void testSubscriptionQueueCreatedWhenAutoCreateDisabled() throws Exception {
final String topic = "a/b";
final String clientID = "myClientID";
server.getAddressSettingsRepository().getMatch(topic).setAutoCreateQueues(false);
MqttClient client = createPahoClient(clientID);
client.connect();
client.subscribe(topic, 1);
Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) != null, 2000, 100);
client.disconnect();
client.close();
}
}

View File

@ -830,6 +830,19 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
@TestTemplate
public void testSubscriptionQueueCreatedWhenAutoCreateDisabled() throws Exception {
SimpleString topic = SimpleString.toSimpleString(getTopicPrefix() + getTopicName());
server.getAddressSettingsRepository().getMatch(topic.toString()).setAutoCreateQueues(false);
conn.connect(defUser, defPass);
assertEquals(0, server.getPostOffice().getBindingsForAddress(topic).size());
subscribeTopic(conn, null, null, null, true);
Wait.assertEquals(1, () -> server.getPostOffice().getBindingsForAddress(topic).size(), 2000, 100);
conn.disconnect();
}
@TestTemplate
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
conn.connect(defUser, defPass);