diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 0ff203cd54..ef8fe0a396 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -248,7 +248,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private boolean checksumJournalFiles = true; protected boolean forceRecoverIndex = false; private final Object checkpointThreadLock = new Object(); - private boolean rewriteOnRedelivery = false; private boolean archiveCorruptedIndex = false; private boolean useIndexLFRUEviction = false; private float indexLFUEvictionFactor = 0.2f; @@ -1161,7 +1160,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - @SuppressWarnings("rawtypes") protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { this.indexLock.writeLock().lock(); try { @@ -2153,7 +2151,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe SequenceSet pendingAcks = subscription.getValue(); if (pendingAcks != null && !pendingAcks.isEmpty()) { Long lastPendingAck = pendingAcks.getTail().getLast(); - for(Long sequenceId : pendingAcks) { + for (Long sequenceId : pendingAcks) { Long current = rc.messageReferences.get(sequenceId); if (current == null) { current = new Long(0); @@ -2163,6 +2161,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // so we need to ensure we don't count that as a message reference on reload. if (!sequenceId.equals(lastPendingAck)) { current = current.longValue() + 1; + } else { + current = Long.valueOf(0L); } rc.messageReferences.put(sequenceId, current); @@ -2235,8 +2235,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe for (Long ackPosition : allOutstanding) { Long count = sd.messageReferences.get(ackPosition); - count = count.longValue() + 1; - sd.messageReferences.put(ackPosition, count); + + // There might not be a reference if the ackLocation was the last + // one which is a placeholder for the next incoming message and + // no value was added to the message references table. + if (count != null) { + count = count.longValue() + 1; + sd.messageReferences.put(ackPosition, count); + } } } @@ -2259,7 +2265,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } count = count.longValue() + 1; sd.messageReferences.put(messageSequence, count); - sd.messageReferences.put(messageSequence+1, Long.valueOf(0L)); + sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L)); } } @@ -2322,8 +2328,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Check if the message is reference by any other subscription. Long count = sd.messageReferences.get(messageSequence); - if (count != null){ - long references = count.longValue() - 1; + if (count != null) { + long references = count.longValue() - 1; if (references > 0) { sd.messageReferences.put(messageSequence, Long.valueOf(references)); return; @@ -3050,7 +3056,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - class MessageOrderIterator implements Iterator>{ Iterator>currentIterator; final Iterator>highIterator; @@ -3145,7 +3150,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void remove() { throw new UnsupportedOperationException(); } - } } @@ -3209,5 +3213,4 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void setPreallocationStrategy(String preallocationStrategy) { this.preallocationStrategy = preallocationStrategy; } - } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java index 029de93c21..e5282b31ad 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java @@ -68,7 +68,7 @@ public class MQTTMaxFrameSizeTest extends MQTTTestSupport { LOG.debug("Starting test on connector {} for frame size: {}", getProtocolScheme(), maxFrameSize); MQTT mqtt = createMQTTConnection(); - mqtt.setClientId(getName()); + mqtt.setClientId(getTestName()); mqtt.setKeepAlive((short) 10); mqtt.setVersion("3.1.1"); @@ -97,7 +97,7 @@ public class MQTTMaxFrameSizeTest extends MQTTTestSupport { LOG.debug("Starting test on connector {} for frame size: {}", getProtocolScheme(), maxFrameSize); MQTT mqtt = createMQTTConnection(); - mqtt.setClientId(getName()); + mqtt.setClientId(getTestName()); mqtt.setKeepAlive((short) 10); mqtt.setVersion("3.1.1"); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java new file mode 100644 index 0000000000..33fb61b44b --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class MQTTOverlapedSubscriptionsTest { + + private BrokerService brokerService; + private String mqttClientUrl; + + @Before + public void setup() throws Exception { + initializeBroker(true); + } + + @After + public void shutdown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + protected void initializeBroker(boolean deleteAllMessagesOnStart) throws Exception { + + brokerService = new BrokerService(); + brokerService.setPersistent(true); + brokerService.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStart); + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI("mqtt://localhost:0")); + connector.setName("mqtt"); + brokerService.addConnector(connector); + brokerService.start(); + brokerService.waitUntilStarted(); + + mqttClientUrl = connector.getPublishableConnectString().replace("mqtt", "tcp"); + } + + @Test + public void testMqttResubscribe() throws Exception { + // inactive durable consumer on test/1 will be left on the broker after restart + doTest("test/1"); + + shutdown(); + initializeBroker(false); + + // new consumer on test/# will match all messages sent to the inactive sub + doTest("test/#"); + } + + private BlockingConnection getConnection(String host, String clientId) throws URISyntaxException, Exception { + BlockingConnection conn; + MQTT mqttPub = new MQTT(); + mqttPub.setHost(host); + mqttPub.setConnectAttemptsMax(0); + mqttPub.setReconnectAttemptsMax(0); + mqttPub.setClientId(clientId); + mqttPub.setCleanSession(false); + conn = mqttPub.blockingConnection(); + conn.connect(); + return conn; + } + + public void doTest(String subscribe) throws Exception { + String payload = "This is test payload"; + BlockingConnection connectionPub = getConnection(mqttClientUrl, "client1"); + BlockingConnection connectionSub = getConnection(mqttClientUrl, "client2"); + Topic[] topics = { new Topic(subscribe, QoS.values()[1]) }; + connectionSub.subscribe(topics); + connectionPub.publish("test/1", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + receive(connectionSub, 3000); + + //Unsubscribe and resubscribe + connectionSub.unsubscribe(new String[]{subscribe}); + connectionSub.subscribe(topics); + connectionPub.publish(subscribe, payload.getBytes(), QoS.AT_LEAST_ONCE, false); + receive(connectionSub, 3000); + + connectionPub.disconnect(); + connectionSub.disconnect(); + } + + public byte[] receive(BlockingConnection connection, int timeout) throws Exception { + byte[] result = null; + org.fusesource.mqtt.client.Message message = connection.receive(timeout, TimeUnit.MILLISECONDS); + if (message != null) { + result = message.getPayload(); + message.ack(); + } + return result; + } +} diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java index 61f74cc95f..0b7f958179 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java @@ -72,7 +72,7 @@ public class MQTTSubscriptionRecoveryTest extends MQTTTestSupport { @Test public void testDurableSubscriptionsAreRecovered() throws Exception { - MqttClient connection = createClient(getName()); + MqttClient connection = createClient(getTestName()); final String[] topics = { "TopicA/", "TopicB/", "TopicC/" }; for (int i = 0; i < topics.length; i++) { @@ -90,7 +90,7 @@ public class MQTTSubscriptionRecoveryTest extends MQTTTestSupport { assertStatsForDisconnectedClient(topics.length); - connection = createClient(getName()); + connection = createClient(getTestName()); assertStatsForConnectedClient(topics.length); } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java index 18aee542c1..0b58687637 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java @@ -44,6 +44,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Tracer; @@ -59,6 +60,8 @@ public class MQTTTestSupport { private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class); + public static final String KAHADB_DIRECTORY = "target/activemq-data/"; + protected BrokerService brokerService; protected int port; protected String jmsUri = "vm://localhost"; @@ -90,7 +93,7 @@ public class MQTTTestSupport { this.useSSL = useSSL; } - public String getName() { + public String getTestName() { return name.getMethodName(); } @@ -144,6 +147,11 @@ public class MQTTTestSupport { BrokerService brokerService = new BrokerService(); brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages); brokerService.setPersistent(isPersistent()); + if (isPersistent()) { + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName())); + brokerService.setPersistenceAdapter(kaha); + } brokerService.setAdvisorySupport(false); brokerService.setUseJmx(true); brokerService.getManagementContext().setCreateConnector(false);