Fixed AMQ-5160, fixed durable subscription retroactive recovery

This commit is contained in:
Dhiraj Bokde 2014-05-16 14:21:19 -07:00 committed by Dejan Bosanac
parent 42ad1039cb
commit 6c859676b3
4 changed files with 55 additions and 21 deletions

View File

@ -23,7 +23,6 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -120,9 +119,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
if (active.get() || keepDurableSubsActive) { if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic) destination; Topic topic = (Topic) destination;
topic.activate(context, this); topic.activate(context, this);
if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
topic.recoverRetroactiveMessages(context, this);
}
this.enqueueCounter += pending.size(); this.enqueueCounter += pending.size();
} else if (destination.getMessageStore() != null) { } else if (destination.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
@ -172,12 +168,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
pending.setMaxAuditDepth(getMaxAuditDepth()); pending.setMaxAuditDepth(getMaxAuditDepth());
pending.setMaxProducersToAudit(getMaxProducersToAudit()); pending.setMaxProducersToAudit(getMaxProducersToAudit());
pending.start(); pending.start();
// use recovery policy for retroactive topics and consumers }
for (Destination destination : durableDestinations.values()) { // use recovery policy every time sub is activated for retroactive topics and consumers
Topic topic = (Topic) destination; for (Destination destination : durableDestinations.values()) {
if (topic.isAlwaysRetroactive() || info.isRetroactive()) { Topic topic = (Topic) destination;
topic.recoverRetroactiveMessages(context, this); if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
} topic.recoverRetroactiveMessages(context, this);
} }
} }
} }
@ -277,7 +273,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
} }
@Override @Override
protected void dispatchPending() throws IOException { public void dispatchPending() throws IOException {
if (isActive()) { if (isActive()) {
super.dispatchPending(); super.dispatchPending();
} }

View File

@ -633,7 +633,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
dispatched.removeAll(references); dispatched.removeAll(references);
} }
protected void dispatchPending() throws IOException { // made public so it can be used in MQTTProtocolConverter
public void dispatchPending() throws IOException {
synchronized(pendingLock) { synchronized(pendingLock) {
try { try {
int numberToDispatch = countBeforeFull(); int numberToDispatch = countBeforeFull();

View File

@ -17,6 +17,8 @@
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -30,6 +32,7 @@ import javax.jms.Message;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.PrefetchSubscription;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicRegion;
@ -71,6 +74,8 @@ public class MQTTProtocolConverter {
private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>(); private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE); private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE); private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE);
private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
@ -317,6 +322,8 @@ public class MQTTProtocolConverter {
String[] split = name.split(":", 2); String[] split = name.split(":", 2);
QoS qoS = QoS.valueOf(split[0]); QoS qoS = QoS.valueOf(split[0]);
onSubscribe(new Topic(split[1], qoS)); onSubscribe(new Topic(split[1], qoS));
// mark this durable subscription as restored by Broker
restoredSubs.add(split[1]);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Could not restore the MQTT durable subs.", e); LOG.warn("Could not restore the MQTT durable subs.", e);
@ -416,6 +423,12 @@ public class MQTTProtocolConverter {
private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination, private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination,
MQTTSubscription mqttSubscription) throws MQTTProtocolException { MQTTSubscription mqttSubscription) throws MQTTProtocolException {
// check whether the Topic has been recovered in restoreDurableSubs
// mark subscription available for recovery for duplicate subscription
if (restoredSubs.remove(destination.getPhysicalName())) {
return;
}
// get TopicRegion // get TopicRegion
RegionBroker regionBroker; RegionBroker regionBroker;
try { try {
@ -441,6 +454,11 @@ public class MQTTProtocolConverter {
if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) { if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
try { try {
((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
if (subscription instanceof PrefetchSubscription) {
// request dispatch for prefetch subs
PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
prefetchSubscription.dispatchPending();
}
} catch (Exception e) { } catch (Exception e) {
throw new MQTTProtocolException("Error recovering retained messages for " + throw new MQTTProtocolException("Error recovering retained messages for " +
dest.getName() + ": " + e.getMessage(), false, e); dest.getName() + ": " + e.getMessage(), false, e);
@ -479,6 +497,9 @@ public class MQTTProtocolConverter {
// check if the durable sub also needs to be removed // check if the durable sub also needs to be removed
if (subs.getConsumerInfo().getSubscriptionName() != null) { if (subs.getConsumerInfo().getSubscriptionName() != null) {
// also remove it from restored durable subscriptions set
restoredSubs.remove(convertMQTTToActiveMQ(topicName.toString()));
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(connectionId); rsi.setConnectionId(connectionId);
rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName()); rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName());

View File

@ -526,55 +526,71 @@ public class MQTTTest extends AbstractMQTTTest {
} }
@Test(timeout = 60 * 1000) @Test(timeout = 120 * 1000)
public void testRetainedMessage() throws Exception { public void testRetainedMessage() throws Exception {
addMQTTConnector(); addMQTTConnector();
brokerService.start(); brokerService.start();
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short) 2); mqtt.setKeepAlive((short) 2);
mqtt.setCleanSession(true);
final String RETAIN = "RETAIN"; final String RETAIN = "RETAIN";
final String TOPICA = "TopicA"; final String TOPICA = "TopicA";
final String[] clientIds = { null, "foo" }; final String[] clientIds = { null, "foo", "durable" };
for (String clientId : clientIds) { for (String clientId : clientIds) {
mqtt.setClientId(clientId); mqtt.setClientId(clientId);
final BlockingConnection connection = mqtt.blockingConnection(); mqtt.setCleanSession(!"durable".equals(clientId));
BlockingConnection connection = mqtt.blockingConnection();
connection.connect(); connection.connect();
// set retained message and check // set retained message and check
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)}); connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("No retained message for " + clientId, msg); assertNotNull("No retained message for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload())); assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack(); msg.ack();
assertNull(connection.receive(5000, TimeUnit.MILLISECONDS));
// test duplicate subscription // test duplicate subscription
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)}); connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(5000, TimeUnit.MILLISECONDS); msg = connection.receive(15000, TimeUnit.MILLISECONDS);
assertNotNull("No retained message on duplicate subscription for " + clientId, msg); assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload())); assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack(); msg.ack();
assertNull(connection.receive(5000, TimeUnit.MILLISECONDS));
connection.unsubscribe(new String[]{"TopicA"}); connection.unsubscribe(new String[]{"TopicA"});
// clear retained message and check that we don't receive it // clear retained message and check that we don't receive it
connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true); connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)}); connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(5000, TimeUnit.MILLISECONDS); msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNull("Retained message not cleared for " + clientId, msg); assertNull("Retained message not cleared for " + clientId, msg);
connection.unsubscribe(new String[]{"TopicA"}); connection.unsubscribe(new String[]{"TopicA"});
// set retained message again and check // set retained message again and check
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true); connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_MOST_ONCE)}); connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(5000, TimeUnit.MILLISECONDS); msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("No reset retained message for " + clientId, msg); assertNotNull("No reset retained message for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload())); assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack(); msg.ack();
assertNull(connection.receive(5000, TimeUnit.MILLISECONDS));
// re-connect and check
connection.disconnect();
connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("No reset retained message for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
assertNull(connection.receive(5000, TimeUnit.MILLISECONDS));
connection.unsubscribe(new String[]{"TopicA"}); connection.unsubscribe(new String[]{"TopicA"});
connection.disconnect(); connection.disconnect();