mirror of https://github.com/apache/activemq.git
Fixes AMQ-4896 - MQTT does not properly restore durable subs with the Paho client.
This commit is contained in:
parent
de58386607
commit
bc4f4e92a6
|
@ -137,6 +137,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
this.stopTaskRunnerFactory = stopTaskRunnerFactory;
|
||||
this.transport = transport;
|
||||
final BrokerService brokerService = this.broker.getBrokerService();
|
||||
if( this.transport instanceof BrokerServiceAware ) {
|
||||
((BrokerServiceAware)this.transport).setBrokerService(brokerService);
|
||||
}
|
||||
this.transport.setTransportListener(new DefaultTransportListener() {
|
||||
@Override
|
||||
public void onCommand(Object o) {
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.SubscriptionInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Used to implement common PersistenceAdapter methods.
|
||||
*/
|
||||
public class PersistenceAdapterSupport {
|
||||
|
||||
static public List<SubscriptionInfo> listSubscriptions(PersistenceAdapter pa, String clientId) throws IOException {
|
||||
ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
|
||||
for (ActiveMQDestination destination : pa.getDestinations()) {
|
||||
if( destination.isTopic() ) {
|
||||
TopicMessageStore store = pa.createTopicMessageStore((ActiveMQTopic) destination);
|
||||
for (SubscriptionInfo sub : store.getAllSubscriptions()) {
|
||||
if(clientId==sub.getClientId() || clientId.equals(sub.getClientId()) ) {
|
||||
rc.add(sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
}
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.ws;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerServiceAware;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.transport.TransportSupport;
|
||||
import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
|
||||
|
@ -35,13 +37,14 @@ import java.io.IOException;
|
|||
import java.security.cert.X509Certificate;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport {
|
||||
public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport, BrokerServiceAware {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
|
||||
Connection outbound;
|
||||
MQTTProtocolConverter protocolConverter = new MQTTProtocolConverter(this, null);
|
||||
MQTTProtocolConverter protocolConverter = null;
|
||||
MQTTWireFormat wireFormat = new MQTTWireFormat();
|
||||
private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
|
||||
private BrokerService brokerService;
|
||||
|
||||
@Override
|
||||
public void onMessage(byte[] bytes, int offset, int length) {
|
||||
|
@ -56,12 +59,19 @@ public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryM
|
|||
|
||||
try {
|
||||
MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
|
||||
protocolConverter.onMQTTCommand(frame);
|
||||
getProtocolConverter().onMQTTCommand(frame);
|
||||
} catch (Exception e) {
|
||||
onException(IOExceptionSupport.create(e));
|
||||
}
|
||||
}
|
||||
|
||||
private MQTTProtocolConverter getProtocolConverter() {
|
||||
if( protocolConverter == null ) {
|
||||
protocolConverter = new MQTTProtocolConverter(this, brokerService);
|
||||
}
|
||||
return protocolConverter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(Connection connection) {
|
||||
this.outbound = connection;
|
||||
|
@ -70,7 +80,7 @@ public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryM
|
|||
@Override
|
||||
public void onClose(int closeCode, String message) {
|
||||
try {
|
||||
protocolConverter.onMQTTCommand(new DISCONNECT().encode());
|
||||
getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to close WebSocket", e);
|
||||
}
|
||||
|
@ -101,7 +111,7 @@ public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryM
|
|||
@Override
|
||||
public void oneway(Object command) throws IOException {
|
||||
try {
|
||||
protocolConverter.onActiveMQCommand((Command)command);
|
||||
getProtocolConverter().onActiveMQCommand((Command) command);
|
||||
} catch (Exception e) {
|
||||
onException(IOExceptionSupport.create(e));
|
||||
}
|
||||
|
@ -132,4 +142,9 @@ public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryM
|
|||
public MQTTWireFormat getWireFormat() {
|
||||
return wireFormat;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBrokerService(BrokerService brokerService) {
|
||||
this.brokerService = brokerService;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.activemq.wireformat.WireFormat;
|
|||
*/
|
||||
public class MQTTNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
|
||||
|
||||
private BrokerContext brokerContext = null;
|
||||
private BrokerService brokerService = null;
|
||||
|
||||
protected String getDefaultWireFormatType() {
|
||||
return "mqtt";
|
||||
|
@ -77,13 +77,13 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
|
|||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||
transport = new MQTTTransportFilter(transport, format, brokerContext);
|
||||
transport = new MQTTTransportFilter(transport, format, brokerService);
|
||||
IntrospectionSupport.setProperties(transport, options);
|
||||
return super.compositeConfigure(transport, format, options);
|
||||
}
|
||||
|
||||
public void setBrokerService(BrokerService brokerService) {
|
||||
this.brokerContext = brokerService.getBrokerContext();
|
||||
this.brokerService = brokerService;
|
||||
}
|
||||
|
||||
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package org.apache.activemq.transport.mqtt;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -28,29 +30,10 @@ import javax.jms.JMSException;
|
|||
import javax.jms.Message;
|
||||
|
||||
import org.apache.activemq.broker.BrokerContext;
|
||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.ConnectionError;
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.ExceptionResponse;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.command.RemoveInfo;
|
||||
import org.apache.activemq.command.Response;
|
||||
import org.apache.activemq.command.SessionId;
|
||||
import org.apache.activemq.command.SessionInfo;
|
||||
import org.apache.activemq.command.ShutdownInfo;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.*;
|
||||
import org.apache.activemq.store.PersistenceAdapterSupport;
|
||||
import org.apache.activemq.store.TopicMessageStore;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
|
@ -102,6 +85,7 @@ public class MQTTProtocolConverter {
|
|||
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
|
||||
private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
|
||||
private final MQTTTransport mqttTransport;
|
||||
private final BrokerService brokerService;
|
||||
|
||||
private final Object commnadIdMutex = new Object();
|
||||
private int lastCommandId;
|
||||
|
@ -113,8 +97,9 @@ public class MQTTProtocolConverter {
|
|||
private int activeMQSubscriptionPrefetch=1;
|
||||
private final String QOS_PROPERTY_NAME = "QoSPropertyName";
|
||||
|
||||
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
|
||||
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
|
||||
this.mqttTransport = mqttTransport;
|
||||
this.brokerService = brokerService;
|
||||
this.defaultKeepAlive = 0;
|
||||
}
|
||||
|
||||
|
@ -269,12 +254,43 @@ public class MQTTProtocolConverter {
|
|||
connected.set(true);
|
||||
getMQTTTransport().sendToMQTT(ack.encode());
|
||||
|
||||
List<SubscriptionInfo> subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId());
|
||||
if( connect.cleanSession() ) {
|
||||
deleteDurableSubs(subs);
|
||||
} else {
|
||||
restoreDurableSubs(subs);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void deleteDurableSubs(List<SubscriptionInfo> subs) {
|
||||
try {
|
||||
for (SubscriptionInfo sub : subs) {
|
||||
TopicMessageStore store = brokerService.getPersistenceAdapter().createTopicMessageStore((ActiveMQTopic) sub.getDestination());
|
||||
store.deleteSubscription(connectionInfo.getClientId(), sub.getSubscriptionName());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Could not delete the MQTT durable subs.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void restoreDurableSubs(List<SubscriptionInfo> subs) {
|
||||
try {
|
||||
SUBSCRIBE command = new SUBSCRIBE();
|
||||
for (SubscriptionInfo sub : subs) {
|
||||
String name = sub.getSubcriptionName();
|
||||
String[] split = name.split(":", 2);
|
||||
QoS qoS = QoS.valueOf(split[0]);
|
||||
onSubscribe(new Topic(split[1], qoS));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Could not restore the MQTT durable subs.", e);
|
||||
}
|
||||
}
|
||||
|
||||
void onMQTTDisconnect() throws MQTTProtocolException {
|
||||
if (connected.get()) {
|
||||
connected.set(false);
|
||||
|
@ -290,7 +306,7 @@ public class MQTTProtocolConverter {
|
|||
if (topics != null) {
|
||||
byte[] qos = new byte[topics.length];
|
||||
for (int i = 0; i < topics.length; i++) {
|
||||
qos[i] = (byte) onSubscribe(command, topics[i]).ordinal();
|
||||
qos[i] = (byte) onSubscribe(topics[i]).ordinal();
|
||||
}
|
||||
SUBACK ack = new SUBACK();
|
||||
ack.messageId(command.messageId());
|
||||
|
@ -305,25 +321,25 @@ public class MQTTProtocolConverter {
|
|||
}
|
||||
}
|
||||
|
||||
QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
|
||||
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
|
||||
QoS onSubscribe(Topic topic) throws MQTTProtocolException {
|
||||
if( !mqttSubscriptionByTopic.containsKey(topic.name()) ) {
|
||||
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
|
||||
|
||||
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
|
||||
ConsumerInfo consumerInfo = new ConsumerInfo(id);
|
||||
consumerInfo.setDestination(destination);
|
||||
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
|
||||
consumerInfo.setDispatchAsync(true);
|
||||
if (!connect.cleanSession() && (connect.clientId() != null)) {
|
||||
//by default subscribers are persistent
|
||||
consumerInfo.setSubscriptionName(
|
||||
connect.clientId().toString() + topic.name().toString());
|
||||
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
|
||||
ConsumerInfo consumerInfo = new ConsumerInfo(id);
|
||||
consumerInfo.setDestination(destination);
|
||||
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
|
||||
consumerInfo.setDispatchAsync(true);
|
||||
if (!connect.cleanSession() && (connect.clientId() != null)) {
|
||||
consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
|
||||
}
|
||||
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
|
||||
|
||||
subscriptionsByConsumerId.put(id, mqttSubscription);
|
||||
mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
|
||||
|
||||
sendToActiveMQ(consumerInfo, null);
|
||||
}
|
||||
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
|
||||
|
||||
subscriptionsByConsumerId.put(id, mqttSubscription);
|
||||
mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
|
||||
|
||||
sendToActiveMQ(consumerInfo, null);
|
||||
return topic.qos();
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.activemq.wireformat.WireFormat;
|
|||
*/
|
||||
public class MQTTSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
|
||||
|
||||
private BrokerContext brokerContext = null;
|
||||
private BrokerService brokerService = null;
|
||||
|
||||
protected String getDefaultWireFormatType() {
|
||||
return "mqtt";
|
||||
|
@ -42,7 +42,7 @@ public class MQTTSslTransportFactory extends SslTransportFactory implements Brok
|
|||
@SuppressWarnings("rawtypes")
|
||||
|
||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||
transport = new MQTTTransportFilter(transport, format, brokerContext);
|
||||
transport = new MQTTTransportFilter(transport, format, brokerService);
|
||||
IntrospectionSupport.setProperties(transport, options);
|
||||
return super.compositeConfigure(transport, format, options);
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ public class MQTTSslTransportFactory extends SslTransportFactory implements Brok
|
|||
}
|
||||
|
||||
public void setBrokerService(BrokerService brokerService) {
|
||||
this.brokerContext = brokerService.getBrokerContext();
|
||||
this.brokerService = brokerService;
|
||||
}
|
||||
|
||||
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
|
||||
|
|
|
@ -40,7 +40,7 @@ import javax.net.ServerSocketFactory;
|
|||
*/
|
||||
public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
|
||||
|
||||
private BrokerContext brokerContext = null;
|
||||
private BrokerService brokerService = null;
|
||||
|
||||
protected String getDefaultWireFormatType() {
|
||||
return "mqtt";
|
||||
|
@ -54,13 +54,13 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
|
|||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||
transport = new MQTTTransportFilter(transport, format, brokerContext);
|
||||
transport = new MQTTTransportFilter(transport, format, brokerService);
|
||||
IntrospectionSupport.setProperties(transport, options);
|
||||
return super.compositeConfigure(transport, format, options);
|
||||
}
|
||||
|
||||
public void setBrokerService(BrokerService brokerService) {
|
||||
this.brokerContext = brokerService.getBrokerContext();
|
||||
this.brokerService = brokerService;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.broker.BrokerContext;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFilter;
|
||||
|
@ -50,9 +51,9 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
|
|||
|
||||
private boolean trace;
|
||||
|
||||
public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
|
||||
public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
|
||||
super(next);
|
||||
this.protocolConverter = new MQTTProtocolConverter(this, brokerContext);
|
||||
this.protocolConverter = new MQTTProtocolConverter(this, brokerService);
|
||||
|
||||
if (wireFormat instanceof MQTTWireFormat) {
|
||||
this.wireFormat = (MQTTWireFormat) wireFormat;
|
||||
|
|
Loading…
Reference in New Issue