Allow virtual destination to recover retained messages.
This commit is contained in:
Timothy Bish 2014-07-29 17:20:25 -04:00
parent b11fc8faf4
commit f55edcfa25
10 changed files with 477 additions and 36 deletions

View File

@ -406,4 +406,15 @@ public class DestinationFilter implements Destination {
public Destination getNext() { public Destination getNext() {
return next; return next;
} }
public <T> T getAdaptor(Class <? extends T> clazz) {
if (clazz.isInstance(this)) {
return clazz.cast(this);
} else if (next != null && clazz.isInstance(next)) {
return clazz.cast(next);
} else if (next instanceof DestinationFilter) {
return ((DestinationFilter)next).getAdaptor(clazz);
}
return null;
}
} }

View File

@ -22,10 +22,11 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.CommandTypes;
/** /**
* *
* *
*/ */
public abstract class CompositeDestination implements VirtualDestination { public abstract class CompositeDestination implements VirtualDestination {
@ -35,14 +36,17 @@ public abstract class CompositeDestination implements VirtualDestination {
private boolean copyMessage = true; private boolean copyMessage = true;
private boolean concurrentSend = false; private boolean concurrentSend = false;
@Override
public Destination intercept(Destination destination) { public Destination intercept(Destination destination) {
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage(), isConcurrentSend()); return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage(), isConcurrentSend());
} }
@Override
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) { public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {
} }
public void remove(Destination destination) { @Override
public void remove(Destination destination) {
} }
public String getName() { public String getName() {
@ -104,4 +108,39 @@ public abstract class CompositeDestination implements VirtualDestination {
return this.concurrentSend; return this.concurrentSend;
} }
@Override
public ActiveMQDestination getMappedDestinations() {
final ActiveMQDestination[] destinations = new ActiveMQDestination[forwardTo.size()];
int i = 0;
for (Object dest : forwardTo) {
if (dest instanceof FilteredDestination) {
FilteredDestination filteredDestination = (FilteredDestination) dest;
destinations[i++] = filteredDestination.getDestination();
} else if (dest instanceof ActiveMQDestination) {
destinations[i++] = (ActiveMQDestination) dest;
} else {
// highly unlikely, but just in case!
throw new IllegalArgumentException("Unknown mapped destination type " + dest);
}
}
// used just for matching destination paths
return new ActiveMQDestination(destinations) {
@Override
protected String getQualifiedPrefix() {
return "mapped://";
}
@Override
public byte getDestinationType() {
return QUEUE_TYPE | TOPIC_TYPE;
}
@Override
public byte getDataStructureType() {
return CommandTypes.ACTIVEMQ_QUEUE | CommandTypes.ACTIVEMQ_TOPIC;
}
};
}
} }

View File

@ -22,14 +22,20 @@ import org.apache.activemq.command.ActiveMQQueue;
/** /**
* Represents a virtual queue which forwards to a number of other destinations. * Represents a virtual queue which forwards to a number of other destinations.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* *
*
*/ */
public class CompositeQueue extends CompositeDestination { public class CompositeQueue extends CompositeDestination {
@Override
public ActiveMQDestination getVirtualDestination() { public ActiveMQDestination getVirtualDestination() {
return new ActiveMQQueue(getName()); return new ActiveMQQueue(getName());
} }
@Override
public Destination interceptMappedDestination(Destination destination) {
// nothing to do for mapped destinations
return destination;
}
} }

View File

@ -16,19 +16,29 @@
*/ */
package org.apache.activemq.broker.region.virtual; package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
/** /**
* Represents a virtual topic which forwards to a number of other destinations. * Represents a virtual topic which forwards to a number of other destinations.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* *
*
*/ */
public class CompositeTopic extends CompositeDestination { public class CompositeTopic extends CompositeDestination {
@Override
public ActiveMQDestination getVirtualDestination() { public ActiveMQDestination getVirtualDestination() {
return new ActiveMQTopic(getName()); return new ActiveMQTopic(getName());
} }
@Override
public Destination interceptMappedDestination(Destination destination) {
if (!isForwardOnly() && destination.getActiveMQDestination().isQueue()) {
// recover retroactive messages in mapped Queue
return new MappedQueueFilter(getVirtualDestination(), destination);
}
return destination;
}
} }

View File

@ -0,0 +1,86 @@
package org.apache.activemq.broker.region.virtual;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.SubscriptionKey;
/**
* Creates a mapped Queue that can recover messages from subscription recovery
* policy of its Virtual Topic.
*/
public class MappedQueueFilter extends DestinationFilter {
private final ActiveMQDestination virtualDestination;
public MappedQueueFilter(ActiveMQDestination virtualDestination, Destination destination) {
super(destination);
this.virtualDestination = virtualDestination;
}
@Override
public synchronized void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// recover messages for first consumer only
boolean noSubs = getConsumers().isEmpty();
super.addSubscription(context, sub);
if (noSubs && !getConsumers().isEmpty()) {
// new subscription added, recover retroactive messages
final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
final ActiveMQDestination newDestination = sub.getActiveMQDestination();
final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
for (Destination virtualDest : virtualDests) {
if (virtualDest.getActiveMQDestination().isTopic() &&
(virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {
Topic topic = (Topic) getBaseDestination(virtualDest);
if (topic != null) {
// re-use browse() to get recovered messages
final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
// add recovered messages to subscription
for (Message message : messages) {
final Message copy = message.copy();
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
copy.setRegionDestination(regionDest);
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
}
}
}
}
}
}
private BaseDestination getBaseDestination(Destination virtualDest) {
if (virtualDest instanceof BaseDestination) {
return (BaseDestination) virtualDest;
} else if (virtualDest instanceof DestinationFilter) {
return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class);
}
return null;
}
@Override
public synchronized void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
}
@Override
public synchronized void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
super.deleteSubscription(context, key);
}
}

View File

@ -22,8 +22,6 @@ import org.apache.activemq.command.ActiveMQDestination;
/** /**
* Represents some kind of virtual destination. * Represents some kind of virtual destination.
*
*
*/ */
public interface VirtualDestination extends DestinationInterceptor { public interface VirtualDestination extends DestinationInterceptor {
@ -35,5 +33,17 @@ public interface VirtualDestination extends DestinationInterceptor {
/** /**
* Creates a virtual destination from the physical destination * Creates a virtual destination from the physical destination
*/ */
@Override
Destination intercept(Destination destination); Destination intercept(Destination destination);
/**
* Returns mapped destination(s)
*/
ActiveMQDestination getMappedDestinations();
/**
* Creates a mapped destination
*/
Destination interceptMappedDestination(Destination destination);
} }

View File

@ -34,21 +34,26 @@ import org.apache.activemq.filter.DestinationMap;
/** /**
* Implements <a * Implements <a
* href="http://activemq.apache.org/virtual-destinations.html">Virtual Topics</a>. * href="http://activemq.apache.org/virtual-destinations.html">Virtual
* * Topics</a>.
*
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* *
*/ */
public class VirtualDestinationInterceptor implements DestinationInterceptor { public class VirtualDestinationInterceptor implements DestinationInterceptor {
private DestinationMap destinationMap = new DestinationMap(); private DestinationMap destinationMap = new DestinationMap();
private DestinationMap mappedDestinationMap = new DestinationMap();
private VirtualDestination[] virtualDestinations; private VirtualDestination[] virtualDestinations;
@Override
public Destination intercept(Destination destination) { public Destination intercept(Destination destination) {
Set matchingDestinations = destinationMap.get(destination.getActiveMQDestination()); final ActiveMQDestination activeMQDestination = destination.getActiveMQDestination();
Set matchingDestinations = destinationMap.get(activeMQDestination);
List<Destination> destinations = new ArrayList<Destination>(); List<Destination> destinations = new ArrayList<Destination>();
for (Iterator iter = matchingDestinations.iterator(); iter.hasNext();) { for (Iterator iter = matchingDestinations.iterator(); iter.hasNext();) {
VirtualDestination virtualDestination = (VirtualDestination)iter.next(); VirtualDestination virtualDestination = (VirtualDestination) iter.next();
Destination newDestination = virtualDestination.intercept(destination); Destination newDestination = virtualDestination.intercept(destination);
destinations.add(newDestination); destinations.add(newDestination);
} }
@ -60,17 +65,28 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
return createCompositeDestination(destination, destinations); return createCompositeDestination(destination, destinations);
} }
} }
// check if the destination instead matches any mapped destinations
Set mappedDestinations = mappedDestinationMap.get(activeMQDestination);
assert mappedDestinations.size() < 2;
if (!mappedDestinations.isEmpty()) {
// create a mapped destination interceptor
VirtualDestination virtualDestination = (VirtualDestination)
mappedDestinations.toArray(new VirtualDestination[mappedDestinations.size()])[0];
return virtualDestination.interceptMappedDestination(destination);
}
return destination; return destination;
} }
@Override
public synchronized void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { public synchronized void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
for (VirtualDestination virt: virtualDestinations) { for (VirtualDestination virt : virtualDestinations) {
virt.create(broker, context, destination); virt.create(broker, context, destination);
} }
} }
public synchronized void remove(Destination destination) { @Override
public synchronized void remove(Destination destination) {
} }
public VirtualDestination[] getVirtualDestinations() { public VirtualDestination[] getVirtualDestinations() {
@ -79,15 +95,18 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
public void setVirtualDestinations(VirtualDestination[] virtualDestinations) { public void setVirtualDestinations(VirtualDestination[] virtualDestinations) {
destinationMap = new DestinationMap(); destinationMap = new DestinationMap();
mappedDestinationMap = new DestinationMap();
this.virtualDestinations = virtualDestinations; this.virtualDestinations = virtualDestinations;
for (int i = 0; i < virtualDestinations.length; i++) { for (int i = 0; i < virtualDestinations.length; i++) {
VirtualDestination virtualDestination = virtualDestinations[i]; VirtualDestination virtualDestination = virtualDestinations[i];
destinationMap.put(virtualDestination.getVirtualDestination(), virtualDestination); destinationMap.put(virtualDestination.getVirtualDestination(), virtualDestination);
mappedDestinationMap.put(virtualDestination.getMappedDestinations(), virtualDestination);
} }
} }
protected Destination createCompositeDestination(Destination destination, final List<Destination> destinations) { protected Destination createCompositeDestination(Destination destination, final List<Destination> destinations) {
return new DestinationFilter(destination) { return new DestinationFilter(destination) {
@Override
public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
for (Iterator<Destination> iter = destinations.iterator(); iter.hasNext();) { for (Iterator<Destination> iter = destinations.iterator(); iter.hasNext();) {
Destination destination = iter.next(); Destination destination = iter.next();

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.broker.region.virtual; package org.apache.activemq.broker.region.virtual;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
@ -29,10 +32,8 @@ import org.apache.activemq.filter.DestinationFilter;
* Topics</a> using a prefix and postfix. The virtual destination creates a * Topics</a> using a prefix and postfix. The virtual destination creates a
* wildcard that is then used to look up all active queue subscriptions which * wildcard that is then used to look up all active queue subscriptions which
* match. * match.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
*
*/ */
public class VirtualTopic implements VirtualDestination { public class VirtualTopic implements VirtualDestination {
@ -42,17 +43,53 @@ public class VirtualTopic implements VirtualDestination {
private boolean selectorAware = false; private boolean selectorAware = false;
private boolean local = false; private boolean local = false;
@Override
public ActiveMQDestination getVirtualDestination() { public ActiveMQDestination getVirtualDestination() {
return new ActiveMQTopic(getName()); return new ActiveMQTopic(getName());
} }
@Override
public Destination intercept(Destination destination) { public Destination intercept(Destination destination) {
return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : new VirtualTopicInterceptor(
new VirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()); destination, getPrefix(), getPostfix(), isLocal());
} }
@Override
public ActiveMQDestination getMappedDestinations() {
return new ActiveMQQueue(prefix + name + postfix);
}
@Override
public Destination interceptMappedDestination(Destination destination) {
// do a reverse map from destination to get actual virtual destination
final String physicalName = destination.getActiveMQDestination().getPhysicalName();
final Pattern pattern = Pattern.compile(getRegex(prefix) + "(.*)" + getRegex(postfix));
final Matcher matcher = pattern.matcher(physicalName);
if (matcher.matches()) {
final String virtualName = matcher.group(1);
return new MappedQueueFilter(new ActiveMQTopic(virtualName), destination);
}
return destination;
}
private String getRegex(String part) {
StringBuilder builder = new StringBuilder();
for (char c : part.toCharArray()) {
switch (c) {
case '.':
builder.append("\\.");
break;
case '*':
builder.append("[^\\.]*");
break;
default:
builder.append(c);
}
}
return builder.toString();
}
@Override
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) { if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty()) {
DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT)); DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT));
@ -62,9 +99,10 @@ public class VirtualTopic implements VirtualDestination {
} }
} }
public void remove(Destination destination) { @Override
public void remove(Destination destination) {
} }
// Properties // Properties
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -98,17 +136,19 @@ public class VirtualTopic implements VirtualDestination {
public void setName(String name) { public void setName(String name) {
this.name = name; this.name = name;
} }
/** /**
* Indicates whether the selectors of consumers are used to determine dispatch * Indicates whether the selectors of consumers are used to determine
* to a virtual destination, when true only messages matching an existing * dispatch to a virtual destination, when true only messages matching an
* consumer will be dispatched. * existing consumer will be dispatched.
* @param selectorAware when true take consumer selectors into consideration *
* @param selectorAware
* when true take consumer selectors into consideration
*/ */
public void setSelectorAware(boolean selectorAware) { public void setSelectorAware(boolean selectorAware) {
this.selectorAware = selectorAware; this.selectorAware = selectorAware;
} }
public boolean isSelectorAware() { public boolean isSelectorAware() {
return selectorAware; return selectorAware;
} }
@ -123,6 +163,8 @@ public class VirtualTopic implements VirtualDestination {
@Override @Override
public String toString() { public String toString() {
return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(',').append(postfix).append(',').append(selectorAware).append(',').append(local).toString(); return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(',').
append(postfix).append(',').append(selectorAware).
append(',').append(local).toString();
} }
} }

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.ByteSequence;
import org.junit.Test;
/**
*
*/
public class MQTTCompositeQueueRetainedTest extends MQTTTestSupport {
// configure composite topic
private static final String COMPOSITE_TOPIC = "Composite.TopicA";
private static final String FORWARD_QUEUE = "Composite.Queue.A";
private static final String FORWARD_TOPIC = "Composite.Topic.A";
private static final int NUM_MESSAGES = 25;
@Override
protected void createBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(isPersistent());
brokerService.setAdvisorySupport(false);
brokerService.setSchedulerSupport(isSchedulerSupportEnabled());
brokerService.setPopulateJMSXUserID(true);
final CompositeTopic compositeTopic = new CompositeTopic();
compositeTopic.setName(COMPOSITE_TOPIC);
final ArrayList<ActiveMQDestination> forwardDestinations = new ArrayList<ActiveMQDestination>();
forwardDestinations.add(new ActiveMQQueue(FORWARD_QUEUE));
forwardDestinations.add(new ActiveMQTopic(FORWARD_TOPIC));
compositeTopic.setForwardTo(forwardDestinations);
// NOTE: allows retained messages to be set on the Composite
compositeTopic.setForwardOnly(false);
final VirtualDestinationInterceptor destinationInterceptor = new VirtualDestinationInterceptor();
destinationInterceptor.setVirtualDestinations(new VirtualDestination[] {compositeTopic} );
brokerService.setDestinationInterceptors(new DestinationInterceptor[] { destinationInterceptor });
}
@Test(timeout = 60 * 1000)
public void testSendMQTTReceiveJMSCompositeDestinations() throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
// send retained message
final String MQTT_TOPIC = "Composite/TopicA";
final String RETAINED = "RETAINED";
provider.publish(MQTT_TOPIC, RETAINED.getBytes(), AT_LEAST_ONCE, true);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(jmsUri).createConnection();
// MUST set to true to receive retained messages
activeMQConnection.setUseRetroactiveConsumer(true);
activeMQConnection.setClientID("jms-client");
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = s.createQueue(FORWARD_QUEUE);
javax.jms.Topic jmsTopic = s.createTopic(FORWARD_TOPIC);
MessageConsumer queueConsumer = s.createConsumer(jmsQueue);
MessageConsumer topicConsumer = s.createDurableSubscriber(jmsTopic, "jms-subscription");
// check whether we received retained message twice on mapped Queue, once marked as RETAINED
ActiveMQMessage message;
ByteSequence bs;
for (int i = 0; i < 2; i++) {
message = (ActiveMQMessage) queueConsumer.receive(5000);
assertNotNull("Should get retained message from " + FORWARD_QUEUE, message);
bs = message.getContent();
assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY) != message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
}
// check whether we received retained message on mapped Topic
message = (ActiveMQMessage) topicConsumer.receive(5000);
assertNotNull("Should get retained message from " + FORWARD_TOPIC, message);
bs = message.getContent();
assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
assertFalse(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY));
assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish(MQTT_TOPIC, payload.getBytes(), AT_LEAST_ONCE);
message = (ActiveMQMessage) queueConsumer.receive(5000);
assertNotNull("Should get a message from " + FORWARD_QUEUE, message);
bs = message.getContent();
assertEquals(payload, new String(bs.data, bs.offset, bs.length));
message = (ActiveMQMessage) topicConsumer.receive(5000);
assertNotNull("Should get a message from " + FORWARD_TOPIC, message);
bs = message.getContent();
assertEquals(payload, new String(bs.data, bs.offset, bs.length));
}
// close consumer and look for retained messages again
queueConsumer.close();
topicConsumer.close();
queueConsumer = s.createConsumer(jmsQueue);
topicConsumer = s.createDurableSubscriber(jmsTopic, "jms-subscription");
// check whether we received retained message on mapped Queue, again
message = (ActiveMQMessage) queueConsumer.receive(5000);
assertNotNull("Should get recovered retained message from " + FORWARD_QUEUE, message);
bs = message.getContent();
assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
assertNull("Should not get second retained message from " + FORWARD_QUEUE, queueConsumer.receive(5000));
// check whether we received retained message on mapped Topic, again
message = (ActiveMQMessage) topicConsumer.receive(5000);
assertNotNull("Should get recovered retained message from " + FORWARD_TOPIC, message);
bs = message.getContent();
assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
assertNull("Should not get second retained message from " + FORWARD_TOPIC, topicConsumer.receive(5000));
// create second queue consumer and verify that it doesn't trigger message recovery
final MessageConsumer queueConsumer2 = s.createConsumer(jmsQueue);
assertNull("Second consumer MUST not receive retained message from " + FORWARD_QUEUE, queueConsumer2.receive(5000));
activeMQConnection.close();
provider.disconnect();
}
}

View File

@ -41,10 +41,12 @@ import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
@ -1426,6 +1428,55 @@ public class MQTTTest extends MQTTTestSupport {
assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]); assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]);
} }
@Test(timeout = 60 * 1000)
public void testSendMQTTReceiveJMSVirtualTopic() throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
final String DESTINATION_NAME = "Consumer.jms.VirtualTopic.TopicA";
// send retained message
final String RETAINED = "RETAINED";
final String MQTT_DESTINATION_NAME = "VirtualTopic/TopicA";
provider.publish(MQTT_DESTINATION_NAME, RETAINED.getBytes(), AT_LEAST_ONCE, true);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(jmsUri).createConnection();
// MUST set to true to receive retained messages
activeMQConnection.setUseRetroactiveConsumer(true);
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue jmsQueue = s.createQueue(DESTINATION_NAME);
MessageConsumer consumer = s.createConsumer(jmsQueue);
// check whether we received retained message on JMS subscribe
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
assertNotNull("Should get retained message", message);
ByteSequence bs = message.getContent();
assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Test Message: " + i;
provider.publish(MQTT_DESTINATION_NAME, payload.getBytes(), AT_LEAST_ONCE);
message = (ActiveMQMessage) consumer.receive(5000);
assertNotNull("Should get a message", message);
bs = message.getContent();
assertEquals(payload, new String(bs.data, bs.offset, bs.length));
}
// re-create consumer and check we received retained message again
consumer.close();
consumer = s.createConsumer(jmsQueue);
message = (ActiveMQMessage) consumer.receive(5000);
assertNotNull("Should get retained message", message);
bs = message.getContent();
assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
activeMQConnection.close();
provider.disconnect();
}
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)
public void testPingOnMQTT() throws Exception { public void testPingOnMQTT() throws Exception {
stopBroker(); stopBroker();