This commit is contained in:
Justin Bertram 2018-01-18 08:59:35 -06:00
commit 8428219901
2 changed files with 26 additions and 7 deletions

View File

@ -24,28 +24,29 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.CompositeAddress;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
public class MQTTSubscriptionManager {
private MQTTSession session;
private final MQTTSession session;
private ConcurrentMap<Long, Integer> consumerQoSLevels;
private final ConcurrentMap<Long, Integer> consumerQoSLevels;
private ConcurrentMap<String, ServerConsumer> consumers;
private final ConcurrentMap<String, ServerConsumer> consumers;
// We filter out Artemis management messages and notifications
private SimpleString managementFilter;
private final SimpleString managementFilter;
public MQTTSubscriptionManager(MQTTSession session) {
this.session = session;
@ -108,7 +109,8 @@ public class MQTTSubscriptionManager {
if (!bindingQueryResult.isAutoCreateAddresses()) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
}
addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), RoutingType.MULTICAST, false);
addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address),
RoutingType.MULTICAST, true);
}
return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos);
}

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -1946,4 +1947,20 @@ public class MQTTTest extends MQTTTestSupport {
connection2.disconnect();
}
}
@Test
public void autoDestroyAddress() throws Exception {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAutoDeleteAddresses(true);
server.getAddressSettingsRepository().addMatch("foo.bar", addressSettings);
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
subscriptionProvider.subscribe("foo/bar", AT_MOST_ONCE);
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar")));
subscriptionProvider.disconnect();
assertNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar")));
}
}