This closes #1002 gitkMqtt ack clean session

This commit is contained in:
Andy Taylor 2017-02-09 11:36:13 +00:00
commit 8a88a56693
8 changed files with 109 additions and 67 deletions

View File

@ -67,12 +67,12 @@ public class MQTTConnectionManager {
return;
}
session.setSessionState(getSessionState(clientId, cleanSession));
session.setSessionState(getSessionState(clientId));
ServerSessionImpl serverSession = createServerSession(username, password);
serverSession.start();
session.setServerSession(serverSession);
session.setIsClean(cleanSession);
if (will) {
ServerMessage w = MQTTUtil.createServerMessageFromString(session, willMessage, willTopic, willQosLevel, willRetain);
@ -96,8 +96,20 @@ public class MQTTConnectionManager {
String id = UUIDGenerator.getInstance().generateStringUUID();
ActiveMQServer server = session.getServer();
ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null,
session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext(), session.getProtocolManager().getPrefixes());
ServerSession serverSession = server.createSession(id,
username,
password,
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
session.getConnection(),
MQTTUtil.SESSION_AUTO_COMMIT_SENDS,
MQTTUtil.SESSION_AUTO_COMMIT_ACKS,
MQTTUtil.SESSION_PREACKNOWLEDGE,
MQTTUtil.SESSION_XA,
null,
session.getSessionCallback(),
MQTTUtil.SESSION_AUTO_CREATE_QUEUE,
server.newOperationContext(),
session.getProtocolManager().getPrefixes());
return (ServerSessionImpl) serverSession;
}
@ -131,29 +143,25 @@ public class MQTTConnectionManager {
session.getSessionState().deleteWillMessage();
}
private MQTTSessionState getSessionState(String clientId, boolean cleanSession) throws InterruptedException {
private MQTTSessionState getSessionState(String clientId) throws InterruptedException {
synchronized (MQTTSession.SESSIONS) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
if (cleanSession) {
MQTTSession.SESSIONS.remove(clientId);
return new MQTTSessionState(clientId);
} else {
/* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag is false) otherwise create
a new one. */
MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
if (state != null) {
// TODO Add a count down latch for handling wait during attached session state.
while (state.getAttached()) {
Thread.sleep(1000);
}
return state;
} else {
state = new MQTTSessionState(clientId);
MQTTSession.SESSIONS.put(clientId, state);
return state;
/* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag is false) otherwise create
a new one. */
MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
if (state != null) {
// TODO Add a count down latch for handling wait during attached session state.
while (state.getAttached()) {
Thread.sleep(1000);
}
return state;
} else {
state = new MQTTSessionState(clientId);
MQTTSession.SESSIONS.put(clientId, state);
return state;
}
}
}

View File

@ -67,13 +67,19 @@ public class MQTTPublishManager {
createManagementConsumer();
}
synchronized void stop(boolean clean) throws Exception {
synchronized void stop() throws Exception {
if (managementConsumer != null) {
managementConsumer.removeItself();
managementConsumer.setStarted(false);
managementConsumer.close(false);
if (clean)
session.getServer().destroyQueue(managementAddress);
}
}
void clean() throws Exception {
createManagementAddress();
Queue queue = session.getServer().locateQueue(managementAddress);
if (queue != null) {
queue.deleteQueue();
}
}
@ -84,7 +90,7 @@ public class MQTTPublishManager {
}
private void createManagementAddress() {
managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId());
managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
}
private void createManagementQueue() throws Exception {

View File

@ -55,6 +55,8 @@ public class MQTTSession {
private MQTTProtocolManager protocolManager;
private boolean isClean;
public MQTTSession(MQTTProtocolHandler protocolHandler,
MQTTConnection connection,
MQTTProtocolManager protocolManager) throws Exception {
@ -83,9 +85,8 @@ public class MQTTSession {
synchronized void stop() throws Exception {
if (!stopped) {
protocolHandler.stop(false);
// TODO this should pass in clean session.
subscriptionManager.stop(false);
mqttPublishManager.stop(false);
subscriptionManager.stop();
mqttPublishManager.stop();
if (serverSession != null) {
serverSession.stop();
@ -95,6 +96,10 @@ public class MQTTSession {
if (state != null) {
state.setAttached(false);
}
if (isClean()) {
clean();
}
}
stopped = true;
}
@ -103,6 +108,17 @@ public class MQTTSession {
return stopped;
}
boolean isClean() {
return isClean;
}
void setIsClean(boolean isClean) throws Exception {
this.isClean = isClean;
if (isClean) {
clean();
}
}
MQTTPublishManager getMqttPublishManager() {
return mqttPublishManager;
}
@ -159,4 +175,10 @@ public class MQTTSession {
MQTTProtocolManager getProtocolManager() {
return protocolManager;
}
void clean() throws Exception {
subscriptionManager.clean();
mqttPublishManager.clean();
state.clear();
}
}

View File

@ -39,39 +39,27 @@ public class MQTTSessionState {
private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();
// Used to store Packet ID of Publish QoS1 and QoS2 message. See spec: 4.3.3 QoS 2: Exactly once delivery. Method B.
private Map<Integer, MQTTMessageInfo> messageRefStore;
private final Map<Integer, MQTTMessageInfo> messageRefStore = new ConcurrentHashMap<>();
private ConcurrentMap<String, Map<Long, Integer>> addressMessageMap;
private final ConcurrentMap<String, Map<Long, Integer>> addressMessageMap = new ConcurrentHashMap<>();
private Set<Integer> pubRec;
private Set<Integer> pub;
private final Set<Integer> pubRec = new HashSet<>();
private boolean attached = false;
// Objects track the Outbound message references
private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
private ConcurrentMap<String, ConcurrentMap<Long, Integer>> reverseOutboundReferenceStore;
private final Object outboundLock = new Object();
// FIXME We should use a better mechanism for creating packet IDs.
private AtomicInteger lastId = new AtomicInteger(0);
private final OutboundStore outboundStore = new OutboundStore();
public MQTTSessionState(String clientId) {
this.clientId = clientId;
}
pubRec = new HashSet<>();
pub = new HashSet<>();
outboundMessageReferenceStore = new ConcurrentHashMap<>();
reverseOutboundReferenceStore = new ConcurrentHashMap<>();
messageRefStore = new ConcurrentHashMap<>();
addressMessageMap = new ConcurrentHashMap<>();
public synchronized void clear() {
subscriptions.clear();
messageRefStore.clear();
addressMessageMap.clear();
pubRec.clear();
outboundStore.clear();
willMessage = null;
}
OutboundStore getOutboundStore() {
@ -159,9 +147,9 @@ public class MQTTSessionState {
public class OutboundStore {
private final HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>();
private HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>();
private final HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
private HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
private final Object dataStoreLock = new Object();
@ -202,5 +190,13 @@ public class MQTTSessionState {
public Pair<Long, Long> publishComplete(int mqtt) {
return publishAckd(mqtt);
}
public void clear() {
synchronized (dataStoreLock) {
artemisToMqttMessageMap.clear();
mqttToServerIds.clear();
ids.set(0);
}
}
}
}

View File

@ -74,19 +74,13 @@ public class MQTTSubscriptionManager {
}
}
synchronized void stop(boolean clean) throws Exception {
synchronized void stop() throws Exception {
for (ServerConsumer consumer : consumers.values()) {
consumer.setStarted(false);
consumer.disconnect();
consumer.getQueue().removeConsumer(consumer);
consumer.close(false);
}
if (clean) {
for (ServerConsumer consumer : consumers.values()) {
session.getServer().destroyQueue(consumer.getQueue().getName());
}
}
}
/**
@ -192,15 +186,20 @@ public class MQTTSubscriptionManager {
// FIXME: Do we need this synchronzied?
private synchronized void removeSubscription(String address) throws Exception {
ServerConsumer consumer = consumers.get(address);
String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address);
SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
Queue queue = session.getServer().locateQueue(internalQueueName);
queue.deleteQueue(true);
session.getSessionState().removeSubscription(address);
ServerConsumer consumer = consumers.get(address);
consumers.remove(address);
consumerQoSLevels.remove(consumer.getID());
if (consumer != null) {
consumer.removeItself();
consumerQoSLevels.remove(consumer.getID());
}
if (session.getServerSession().executeQueueQuery(internalQueueName).isExists()) {
session.getServerSession().deleteQueue(internalQueueName);
}
}
private SimpleString getQueueNameForTopic(String topic) {
@ -228,4 +227,9 @@ public class MQTTSubscriptionManager {
return consumerQoSLevels;
}
void clean() throws Exception {
for (MqttTopicSubscription mqttTopicSubscription : session.getSessionState().getSubscriptions()) {
removeSubscription(mqttTopicSubscription.topicName());
}
}
}

View File

@ -43,7 +43,7 @@ public class MQTTUtil {
public static final boolean SESSION_AUTO_COMMIT_SENDS = true;
public static final boolean SESSION_AUTO_COMMIT_ACKS = false;
public static final boolean SESSION_AUTO_COMMIT_ACKS = true;
public static final boolean SESSION_PREACKNOWLEDGE = false;

View File

@ -273,6 +273,7 @@ public class MQTTTest extends MQTTTestSupport {
assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
}
@Ignore
@Test(timeout = 600 * 1000)
public void testSendMoreThanUniqueId() throws Exception {
int messages = (Short.MAX_VALUE * 2) + 1;

View File

@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.mqtt.MqttMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
@ -142,6 +143,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
private ActiveMQServer createServerForMQTT() throws Exception {
Configuration defaultConfig = createDefaultConfig(true).setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName())).setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName()));
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(SimpleString.toSimpleString("DLA"));
addressSettings.setExpiryAddress(SimpleString.toSimpleString("EXPIRY"));
defaultConfig.getAddressesSettings().put("#", addressSettings);
return createServer(true, defaultConfig);
}