ARTEMIS-1617 - Properly set autoCreated flag on address

Flag needs to be set when auto creating an address so that the address
can be removed later if auto delete is configured when creating a
subscription with MQTT
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-01-18 08:36:07 -05:00 committed by Justin Bertram
parent 0d9a114a96
commit 3aef7caac6
2 changed files with 26 additions and 7 deletions

View File

@ -24,28 +24,29 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.CompositeAddress;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
public class MQTTSubscriptionManager {
private MQTTSession session;
private final MQTTSession session;
private ConcurrentMap<Long, Integer> consumerQoSLevels;
private final ConcurrentMap<Long, Integer> consumerQoSLevels;
private ConcurrentMap<String, ServerConsumer> consumers;
private final ConcurrentMap<String, ServerConsumer> consumers;
// We filter out Artemis management messages and notifications
private SimpleString managementFilter;
private final SimpleString managementFilter;
public MQTTSubscriptionManager(MQTTSession session) {
this.session = session;
@ -108,7 +109,8 @@ public class MQTTSubscriptionManager {
if (!bindingQueryResult.isAutoCreateAddresses()) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
}
addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), RoutingType.MULTICAST, false);
addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address),
RoutingType.MULTICAST, true);
}
return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos);
}

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -1946,4 +1947,20 @@ public class MQTTTest extends MQTTTestSupport {
connection2.disconnect();
}
}
@Test
public void autoDestroyAddress() throws Exception {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoDeleteAddresses(true);
server.getAddressSettingsRepository().addMatch("foo.bar", addressSettings);
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
subscriptionProvider.subscribe("foo/bar", AT_MOST_ONCE);
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar")));
subscriptionProvider.disconnect();
assertNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar")));
}
}