https://issues.apache.org/jira/browse/AMQ-5594 - mqtt and virtual topic subs; more refined removing of destinations, as we don't want to remove all descendant destination in a wildcard case

This commit is contained in:
Dejan Bosanac 2015-03-02 14:40:07 +01:00
parent adef03e5a4
commit 4f57744934
6 changed files with 53 additions and 24 deletions

View File

@ -218,7 +218,7 @@ public abstract class AbstractRegion implements Region {
dest.removeSubscription(context, sub, 0l); dest.removeSubscription(context, sub, 0l);
} }
} }
destinationMap.removeAll(destination); destinationMap.remove(destination, dest);
dispose(context, dest); dispose(context, dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) { if (destinationInterceptor != null) {

View File

@ -16,33 +16,25 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch; import org.apache.activemq.ActiveMQConnection;
import java.util.concurrent.TimeUnit; import org.apache.activemq.util.Wait;
import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.paho.client.mqttv3.*;
import java.util.concurrent.atomic.AtomicReference; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.Session; import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.ActiveMQConnection; import static org.junit.Assert.*;
import org.apache.activemq.util.Wait;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PahoMQTTTest extends MQTTTestSupport { public class PahoMQTTTest extends MQTTTestSupport {
@ -162,10 +154,39 @@ public class PahoMQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return listener.result != null; return listener.result != null;
} }
}); }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
assertEquals(expectedResult, listener.result); assertEquals(expectedResult, listener.result);
expectedResult = "should get everything";
listener.result = null;
client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
client.unsubscribe(ACCOUNT_PREFIX + "a/+/#");
client.unsubscribe(ACCOUNT_PREFIX + "#");
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "should still get 1/2/3";
listener.result = null;
client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
} }
@Test(timeout = 300000) @Test(timeout = 300000)

View File

@ -66,6 +66,7 @@ public class HealthViewMBeanTest extends EmbeddedBrokerTestSupport {
answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 64); answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 64);
answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 64); answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 64);
answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 64); answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 64);
answer.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 64);
answer.setUseJmx(true); answer.setUseJmx(true);
answer.setSchedulerSupport(true); answer.setSchedulerSupport(true);

View File

@ -337,6 +337,13 @@ public class DestinationMapTest extends TestCase {
map.removeAll(createDestination("FOO.>")); map.removeAll(createDestination("FOO.>"));
assertMapValue("FOO.A", null); assertMapValue("FOO.A", null);
put("FOO.A", v1);
put("FOO.>", v2);
map.remove(createDestination("FOO.>"), v2);
assertMapValue("FOO.A", v1);
} }
protected void loadSample2() { protected void loadSample2() {

View File

@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
public class SecurityJMXTest extends TestCase { public class SecurityJMXTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(SimpleAuthenticationPluginTest.class); private static final Logger LOG = LoggerFactory.getLogger(SecurityJMXTest.class);
private BrokerService broker; private BrokerService broker;
@Override @Override

View File

@ -43,7 +43,7 @@
<!-- Use a non-default port in case the default port is in use --> <!-- Use a non-default port in case the default port is in use -->
<managementContext> <managementContext>
<managementContext connectorPort="1199"/> <managementContext connectorPort="1199" createConnector="true"/>
</managementContext> </managementContext>
<destinationPolicy> <destinationPolicy>