ARTEMIS-1215 Fix address validation on sub #
This commit is contained in:
parent
c023735612
commit
f20703a13e
|
@ -251,7 +251,7 @@ public class SimpleAddressManager implements AddressManager {
|
||||||
if (binding instanceof QueueBinding) {
|
if (binding instanceof QueueBinding) {
|
||||||
final QueueBinding queueBinding = (QueueBinding) binding;
|
final QueueBinding queueBinding = (QueueBinding) binding;
|
||||||
final RoutingType routingType = queueBinding.getQueue().getRoutingType();
|
final RoutingType routingType = queueBinding.getQueue().getRoutingType();
|
||||||
if (!routingTypes.contains(routingType)) {
|
if (!routingTypes.contains(routingType) && binding.getAddress().equals(addressName)) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeDelete(routingType, addressName.toString());
|
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeDelete(routingType, addressName.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,8 @@ import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
|
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
|
||||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
|
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
|
||||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
|
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
|
||||||
|
@ -1898,6 +1900,41 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
assertEquals(payload, new String(message.getPayload()));
|
assertEquals(payload, new String(message.getPayload()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60 * 1000)
|
||||||
|
public void testBrokerRestartAfterSubHashWithConfigurationQueues() throws Exception {
|
||||||
|
|
||||||
|
// Add some pre configured queues
|
||||||
|
CoreQueueConfiguration coreQueueConfiguration = new CoreQueueConfiguration();
|
||||||
|
coreQueueConfiguration.setName("DLQ");
|
||||||
|
coreQueueConfiguration.setRoutingType(RoutingType.ANYCAST);
|
||||||
|
coreQueueConfiguration.setAddress("DLA");
|
||||||
|
|
||||||
|
CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
|
||||||
|
coreAddressConfiguration.setName("DLA");
|
||||||
|
coreAddressConfiguration.addRoutingType(RoutingType.ANYCAST);
|
||||||
|
coreAddressConfiguration.addQueueConfiguration(coreQueueConfiguration);
|
||||||
|
|
||||||
|
getServer().getConfiguration().getAddressConfigurations().add(coreAddressConfiguration);
|
||||||
|
|
||||||
|
getServer().stop();
|
||||||
|
getServer().start();
|
||||||
|
getServer().waitForActivation(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
MQTT mqtt = createMQTTConnection("myClient", false);
|
||||||
|
BlockingConnection connection = mqtt.blockingConnection();
|
||||||
|
connection.connect();
|
||||||
|
connection.subscribe(new Topic[]{new Topic("#", QoS.AT_MOST_ONCE)});
|
||||||
|
connection.disconnect();
|
||||||
|
|
||||||
|
getServer().stop();
|
||||||
|
getServer().start();
|
||||||
|
getServer().waitForActivation(10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDuplicateIDReturnsError() throws Exception {
|
public void testDuplicateIDReturnsError() throws Exception {
|
||||||
String clientId = "clientId";
|
String clientId = "clientId";
|
||||||
|
|
Loading…
Reference in New Issue