Partial fix for AMQ-5160, attempts to resolve retained messages using subscription recovery policy, but fails to resend retained messages for duplicate subscriptions

This commit is contained in:
Dhiraj Bokde 2014-05-07 19:05:36 -07:00 committed by Dejan Bosanac
parent ba519d8bd3
commit bcb60a482c
4 changed files with 215 additions and 63 deletions

View File

@ -33,7 +33,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
@ -91,7 +91,7 @@ public class Topic extends BaseDestination implements Task {
subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
setAlwaysRetroactive(true);
} else {
subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
}
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
@ -675,8 +675,14 @@ public class Topic extends BaseDestination implements Task {
return subscriptionRecoveryPolicy;
}
public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
// allow users to combine retained message policy with other ActiveMQ policies
RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
policy.setWrapped(recoveryPolicy);
} else {
this.subscriptionRecoveryPolicy = recoveryPolicy;
}
}
// Implementation methods

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.DestinationFilter;
/**
* This implementation of {@link org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy} will only keep the
* last non-zero length message with the {@link org.apache.activemq.command.ActiveMQMessage}.RETAIN_PROPERTY.
*
* @org.apache.xbean.XBean
*
*/
public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
public static final String RETAIN_PROPERTY = "ActiveMQRetain";
public static final String RETAINED_PROPERTY = "ActiveMQRetained";
private volatile MessageReference retainedMessage;
private SubscriptionRecoveryPolicy wrapped;
public RetainedMessageSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy wrapped) {
this.wrapped = wrapped;
}
public boolean add(ConnectionContext context, MessageReference node) throws Exception {
final Message message = node.getMessage();
final Object retainValue = message.getProperty(RETAIN_PROPERTY);
// retain property set to true
final boolean retain = retainValue != null && Boolean.parseBoolean(retainValue.toString());
if (retain) {
if (message.getContent().getLength() > 0) {
// non zero length message content
retainedMessage = message.copy();
retainedMessage.getMessage().setProperty(RETAINED_PROPERTY, true);
} else {
// clear retained message
retainedMessage = null;
}
// TODO should we remove the publisher's retain property??
node.getMessage().removeProperty(RETAIN_PROPERTY);
}
return wrapped == null ? true : wrapped.add(context, node);
}
public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
// Re-dispatch the last retained message seen.
if (retainedMessage != null) {
sub.addRecoveredMessage(context, retainedMessage);
}
if (wrapped != null) {
wrapped.recover(context, topic, sub);
}
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
public Message[] browse(ActiveMQDestination destination) throws Exception {
List<Message> result = new ArrayList<Message>();
if (retainedMessage != null) {
DestinationFilter filter = DestinationFilter.parseFilter(destination);
if (filter.matches(retainedMessage.getMessage().getDestination())) {
result.add(retainedMessage.getMessage());
}
}
return result.toArray(new Message[result.size()]);
}
public SubscriptionRecoveryPolicy copy() {
return new RetainedMessageSubscriptionRecoveryPolicy(wrapped);
}
public void setBroker(Broker broker) {
}
public void setWrapped(SubscriptionRecoveryPolicy wrapped) {
this.wrapped = wrapped;
}
}

View File

@ -28,6 +28,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.command.*;
import org.apache.activemq.store.PersistenceAdapterSupport;
import org.apache.activemq.util.ByteArrayOutputStream;
@ -80,13 +81,11 @@ public class MQTTProtocolConverter {
private long defaultKeepAlive;
private int activeMQSubscriptionPrefetch=1;
private final String QOS_PROPERTY_NAME = "QoSPropertyName";
private final MQTTRetainedMessages retainedMessages;
private final MQTTPacketIdGenerator packetIdGenerator;
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
this.mqttTransport = mqttTransport;
this.brokerService = brokerService;
this.retainedMessages = MQTTRetainedMessages.getMQTTRetainedMessages(brokerService);
this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService);
this.defaultKeepAlive = 0;
}
@ -344,36 +343,6 @@ public class MQTTProtocolConverter {
} catch (IOException e) {
LOG.warn("Couldn't send SUBACK for " + command, e);
}
// check retained messages
for (int i = 0; i < topics.length; i++) {
if (qos[i] == SUBSCRIBE_ERROR) {
// skip this topic if subscribe failed
continue;
}
final Topic topic = topics[i];
ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
for (PUBLISH msg : retainedMessages.getMessages(destination)) {
if( msg.payload().length > 0 ) {
try {
PUBLISH retainedCopy = new PUBLISH();
retainedCopy.topicName(msg.topicName());
retainedCopy.retain(msg.retain());
retainedCopy.payload(msg.payload());
// set QoS of retained message to maximum of subscription QoS
retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos());
switch (retainedCopy.qos()) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
retainedCopy.messageId(packetIdGenerator.getNextSequenceId(getClientId()));
case AT_MOST_ONCE:
}
getMQTTTransport().sendToMQTT(retainedCopy.encode());
} catch (IOException e) {
LOG.warn("Couldn't send retained message " + msg, e);
}
}
}
}
} else {
LOG.warn("No topics defined for Subscription " + command);
}
@ -382,28 +351,33 @@ public class MQTTProtocolConverter {
byte onSubscribe(final Topic topic) throws MQTTProtocolException {
if( mqttSubscriptionByTopic.containsKey(topic.name())) {
if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) {
final UTF8Buffer topicName = topic.name();
final QoS topicQoS = topic.qos();
if( mqttSubscriptionByTopic.containsKey(topicName)) {
if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) {
// remove old subscription as the QoS has changed
onUnSubscribe(topic.name());
onUnSubscribe(topicName);
} else {
// duplicate SUBSCRIBE packet, nothing to do
return (byte) topic.qos().ordinal();
// duplicate SUBSCRIBE packet
// TODO find all matching topics and resend retained messages
return (byte) topicQoS.ordinal();
}
onUnSubscribe(topicName);
}
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString()));
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true);
// create durable subscriptions only when cleansession is false
if ( !connect.cleanSession() && connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
consumerInfo.setSubscriptionName(topicQoS + ":" + topicName.toString());
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
final byte[] qos = {-1};
sendToActiveMQ(consumerInfo, new ResponseHandler() {
@ -412,17 +386,17 @@ public class MQTTProtocolConverter {
// validate subscription request
if (response.isException()) {
final Throwable throwable = ((ExceptionResponse) response).getException();
LOG.warn("Error subscribing to " + topic.name(), throwable);
LOG.warn("Error subscribing to " + topicName, throwable);
qos[0] = SUBSCRIBE_ERROR;
} else {
qos[0] = (byte) topic.qos().ordinal();
qos[0] = (byte) topicQoS.ordinal();
}
}
});
if (qos[0] != SUBSCRIBE_ERROR) {
subscriptionsByConsumerId.put(id, mqttSubscription);
mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
mqttSubscriptionByTopic.put(topicName, mqttSubscription);
}
return qos[0];
@ -508,9 +482,6 @@ public class MQTTProtocolConverter {
void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
checkConnected();
ActiveMQMessage message = convertMessage(command);
if (command.retain()){
retainedMessages.addMessage((ActiveMQTopic) message.getDestination(), command);
}
message.setProducerId(producerId);
message.onSend();
sendToActiveMQ(message, createResponseHandler(command));
@ -570,6 +541,9 @@ public class MQTTProtocolConverter {
msg.setPriority((byte) Message.DEFAULT_PRIORITY);
msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
if (command.retain()) {
msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
}
ActiveMQTopic topic;
synchronized (activeMQTopicMap) {
@ -597,6 +571,9 @@ public class MQTTProtocolConverter {
qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
}
result.qos(qoS);
if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) {
result.retain(true);
}
UTF8Buffer topicName;
synchronized (mqttTopicMap) {

View File

@ -16,16 +16,17 @@
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotEquals;
import java.net.ProtocolException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
@ -34,15 +35,25 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotEquals;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.jaas.GroupPrincipal;
import org.apache.activemq.security.*;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.SimpleAuthorizationMap;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
@ -368,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest {
connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) });
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertNotNull("No message for " + topic, msg);
assertEquals(RETAINED + topic, new String(msg.getPayload()));
msg.ack();
@ -390,16 +401,17 @@ public class MQTTTest extends AbstractMQTTTest {
connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(new Topic[] { new Topic(wildcard, QoS.AT_LEAST_ONCE) });
final byte[] qos = connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]);
// test retained messages
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
do {
assertNotNull("RETAINED null " + wildcard, msg);
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
msg.ack();
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
} while (msg != null);
// connection is borked after timeout in connection.receive()
@ -499,10 +511,10 @@ public class MQTTTest extends AbstractMQTTTest {
QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE };
for (QoS qos : qoss) {
connection.subscribe(new Topic[] { new Topic("TopicA", qos) });
connection.subscribe(new Topic[]{new Topic("TopicA", qos)});
final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertNotNull("No message for " + qos, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
int waitCount = 0;
@ -1340,6 +1352,56 @@ public class MQTTTest extends AbstractMQTTTest {
assertNull("Shouldn't receive the message", msg);
}
@Test(timeout = 60 * 1000)
public void testActiveMQRecoveryPolicy() throws Exception {
addMQTTConnector();
brokerService.start();
// test with ActiveMQ LastImageSubscriptionRecoveryPolicy
final PolicyMap policyMap = new PolicyMap();
final PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy());
policyMap.put(new ActiveMQTopic(">"), policyEntry);
brokerService.setDestinationPolicy(policyMap);
MQTT mqtt = createMQTTConnection("pub-sub", true);
final int[] retain = new int[1];
final int[] nonretain = new int[1];
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
if (frame.messageType() == PUBLISH.TYPE) {
LOG.info("Received message with retain=" + frame.retain());
if (frame.retain()) {
retain[0]++;
} else {
nonretain[0]++;
}
}
}
});
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
final String RETAINED = "RETAINED";
connection.publish("one", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
connection.publish("two", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
final String NONRETAINED = "NONRETAINED";
connection.publish("one", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
connection.publish("two", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
connection.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
for (int i = 0; i < 4; i++) {
final Message message = connection.receive(30, TimeUnit.SECONDS);
assertNotNull("Should receive 4 messages", message);
message.ack();
}
assertEquals("Should receive 2 retained messages", 2, retain[0]);
assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]);
}
@Override
protected String getProtocolScheme() {
return "mqtt";