This commit is contained in:
Dejan Bosanac 2015-02-18 18:29:05 +01:00
parent 141ad4cb8f
commit 05c3112402
6 changed files with 197 additions and 46 deletions

View File

@ -48,32 +48,34 @@ public class MappedQueueFilter extends DestinationFilter {
// recover messages for first consumer only
boolean noSubs = getConsumers().isEmpty();
super.addSubscription(context, sub);
if (!sub.getActiveMQDestination().isPattern() || sub.getActiveMQDestination().equals(next.getActiveMQDestination())) {
super.addSubscription(context, sub);
if (noSubs && !getConsumers().isEmpty()) {
// new subscription added, recover retroactive messages
final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
if (noSubs && !getConsumers().isEmpty()) {
// new subscription added, recover retroactive messages
final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
final ActiveMQDestination newDestination = sub.getActiveMQDestination();
final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
final ActiveMQDestination newDestination = sub.getActiveMQDestination();
final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
for (Destination virtualDest : virtualDests) {
if (virtualDest.getActiveMQDestination().isTopic() &&
(virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {
for (Destination virtualDest : virtualDests) {
if (virtualDest.getActiveMQDestination().isTopic() &&
(virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {
Topic topic = (Topic) getBaseDestination(virtualDest);
if (topic != null) {
// re-use browse() to get recovered messages
final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
Topic topic = (Topic) getBaseDestination(virtualDest);
if (topic != null) {
// re-use browse() to get recovered messages
final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
// add recovered messages to subscription
for (Message message : messages) {
final Message copy = message.copy();
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
copy.setRegionDestination(regionDest);
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
// add recovered messages to subscription
for (Message message : messages) {
final Message copy = message.copy();
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
copy.setRegionDestination(regionDest);
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
}
}
}
}
@ -99,4 +101,9 @@ public class MappedQueueFilter extends DestinationFilter {
public synchronized void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
super.deleteSubscription(context, key);
}
@Override
public String toString() {
return "MappedQueueFilter[" + virtualDestination + ", " + next + "]";
}
}

View File

@ -91,10 +91,11 @@ public class VirtualTopic implements VirtualDestination {
@Override
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) {
if (destination.isQueue() && destination.isPattern()) {
DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT));
if (filter.matches(destination)) {
broker.addDestination(context, destination, false);
}
}
}

View File

@ -112,10 +112,15 @@ public class DestinationMapNode implements DestinationNode {
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void removeDesendentValues(Set answer) {
ArrayList<DestinationNode> candidates = new ArrayList<>();
for (Map.Entry<String, DestinationNode> child : childNodes.entrySet()) {
candidates.add(child.getValue());
}
for (DestinationNode node : candidates) {
// remove all the values from the child
answer.addAll(child.getValue().removeValues());
answer.addAll(child.getValue().removeDesendentValues());
answer.addAll(node.removeValues());
answer.addAll(node.removeDesendentValues());
}
}

View File

@ -16,26 +16,22 @@
*/
package org.apache.activemq.transport.mqtt;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.Wait;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.util.Wait;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
@ -43,13 +39,6 @@ public class PahoMQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
@Override
@Before
public void setUp() throws Exception {
protocolConfig = "transport.activeMQSubscriptionPrefetch=32766";
super.setUp();
}
@Test(timeout = 300000)
public void testLotsOfClients() throws Exception {
@ -140,6 +129,142 @@ public class PahoMQTTTest extends MQTTTestSupport {
client.close();
}
@Test(timeout = 300000)
public void testSubs() throws Exception {
stopBroker();
protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
startBroker();
final DefaultListener listener = new DefaultListener();
// subscriber connects and creates durable sub
MqttClient client = createClient(false, "receive", listener);
final String ACCOUNT_PREFIX = "test/";
client.subscribe(ACCOUNT_PREFIX+"1/2/3");
client.subscribe(ACCOUNT_PREFIX+"a/+/#");
client.subscribe(ACCOUNT_PREFIX+"#");
assertTrue(client.getPendingDeliveryTokens().length == 0);
String expectedResult = "should get everything";
client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertTrue(client.getPendingDeliveryTokens().length == 0);
assertEquals(expectedResult, listener.result);
}
@Test(timeout=300000)
public void testOverlappingTopics() throws Exception {
stopBroker();
protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
startBroker();
final DefaultListener listener = new DefaultListener();
// subscriber connects and creates durable sub
MqttClient client = createClient(false, "receive", listener);
final String ACCOUNT_PREFIX = "test/";
// *****************************************
// check a simple # subscribe works
// *****************************************
client.subscribe(ACCOUNT_PREFIX+"#");
assertTrue(client.getPendingDeliveryTokens().length == 0);
String expectedResult = "hello mqtt broker on hash";
client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on a different topic";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"1/2/3/4/5/6", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
// *****************************************
// now subscribe on a topic that overlaps the root # wildcard - we should still get everything
// *****************************************
client.subscribe(ACCOUNT_PREFIX+"1/2/3");
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on explicit topic";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"1/2/3", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "hello mqtt broker on some other topic";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"a/b/c/d/e", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertEquals(expectedResult, listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
// *****************************************
// now unsub hash - we should only get called back on 1/2/3
// *****************************************
client.unsubscribe(ACCOUNT_PREFIX+"#");
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "this should not come back...";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"1/2/3/4", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
expectedResult = "this should not come back either...";
listener.result = null;
client.publish(ACCOUNT_PREFIX+"a/b/c", expectedResult.getBytes(), 0, false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.result != null;
}
});
assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0);
}
@Test(timeout = 300000)
public void testCleanSession() throws Exception {
String topic = "test";
@ -237,6 +362,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
static class DefaultListener implements MqttCallback {
int received = 0;
String result;
@Override
public void connectionLost(Throwable cause) {
@ -247,6 +373,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
public void messageArrived(String topic, MqttMessage message) throws Exception {
LOG.info("Received: " + message);
received++;
result = new String(message.getPayload());
}
@Override

View File

@ -330,6 +330,15 @@ public class DestinationMapTest extends TestCase {
assertMapValue("FOO.>", v2);
}
public void testRemoveWildcard() throws Exception {
put("FOO.A", v1);
put("FOO.>", v2);
map.removeAll(createDestination("FOO.>"));
assertMapValue("FOO.A", null);
}
protected void loadSample2() {
put("TEST.FOO", v1);
put("TEST.*", v2);

View File

@ -65,8 +65,10 @@ public class SingleBrokerVirtualDestinationsWithWildcardTest extends JmsMultiple
sendReceive("local.test.1", true, "Consumer.a.local.test.1", false, 1, 1);
sendReceive("local.test.1", true, "Consumer.a.local.test.>", false, 1, 1);
sendReceive("local.test.1.2", true, "Consumer.a.local.test.>", false, 1, 1);
sendReceive("global.test.1", true, "Consumer.a.global.test.1", false, 1, 1);
sendReceive("global.test.1", true, "Consumer.a.global.test.>", false, 1, 1);
sendReceive("global.test.1.2", true, "Consumer.a.global.test.>", false, 1, 1);
destroyAllBrokers();
}