diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 53e8cdd341..4f487bfe45 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -218,7 +218,7 @@ public abstract class AbstractRegion implements Region { dest.removeSubscription(context, sub, 0l); } } - destinationMap.removeAll(destination); + destinationMap.remove(destination, dest); dispose(context, dest); DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); if (destinationInterceptor != null) { diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index 2031c5d131..fac843bdf9 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -16,33 +16,25 @@ */ package org.apache.activemq.transport.mqtt; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.util.Wait; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.util.Wait; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.junit.Assert.*; public class PahoMQTTTest extends MQTTTestSupport { @@ -162,10 +154,39 @@ public class PahoMQTTTest extends MQTTTestSupport { public boolean isSatisified() throws Exception { return listener.result != null; } - }); + }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)); + assertTrue(client.getPendingDeliveryTokens().length == 0); assertEquals(expectedResult, listener.result); + + expectedResult = "should get everything"; + listener.result = null; + client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + client.unsubscribe(ACCOUNT_PREFIX + "a/+/#"); + client.unsubscribe(ACCOUNT_PREFIX + "#"); + assertTrue(client.getPendingDeliveryTokens().length == 0); + + expectedResult = "should still get 1/2/3"; + listener.result = null; + client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return listener.result != null; + } + }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)); + assertEquals(expectedResult, listener.result); + assertTrue(client.getPendingDeliveryTokens().length == 0); } @Test(timeout = 300000) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java index 9998be975e..18eb0c767b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/HealthViewMBeanTest.java @@ -66,6 +66,7 @@ public class HealthViewMBeanTest extends EmbeddedBrokerTestSupport { answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 64); answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 64); answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 64); + answer.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 64); answer.setUseJmx(true); answer.setSchedulerSupport(true); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java index 3cfbc647d9..755c3c393c 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/filter/DestinationMapTest.java @@ -337,6 +337,13 @@ public class DestinationMapTest extends TestCase { map.removeAll(createDestination("FOO.>")); assertMapValue("FOO.A", null); + + put("FOO.A", v1); + put("FOO.>", v2); + + map.remove(createDestination("FOO.>"), v2); + + assertMapValue("FOO.A", v1); } protected void loadSample2() { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SecurityJMXTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SecurityJMXTest.java index b62fccb90d..1222b3bd18 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/SecurityJMXTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/SecurityJMXTest.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; public class SecurityJMXTest extends TestCase { - private static final Logger LOG = LoggerFactory.getLogger(SimpleAuthenticationPluginTest.class); + private static final Logger LOG = LoggerFactory.getLogger(SecurityJMXTest.class); private BrokerService broker; @Override diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml index b6827283a8..cb42730021 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml @@ -43,7 +43,7 @@ - +