The code that adds subscriptions for retroactive subscribers did not
account for the case where the last ack position in the ackLocations
table is used as a placeholder for the next incoming message and there
may not be an element in the messageReference tracker for that index
especially after restart when the index is reloaded.  The code needed to
check this when it iterates of the existing message references to add a
value so to avoid the NPE.  

Also cleaned up the MQTT tests such that they place their data dir in
./target so that old stores get removed on 'mvn clean'
This commit is contained in:
Timothy Bish 2015-07-02 17:04:35 -04:00
parent 8e7556f397
commit 455f1ca475
5 changed files with 144 additions and 16 deletions

View File

@ -248,7 +248,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
private final Object checkpointThreadLock = new Object();
private boolean rewriteOnRedelivery = false;
private boolean archiveCorruptedIndex = false;
private boolean useIndexLFRUEviction = false;
private float indexLFUEvictionFactor = 0.2f;
@ -1161,7 +1160,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@SuppressWarnings("rawtypes")
protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
this.indexLock.writeLock().lock();
try {
@ -2153,7 +2151,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
SequenceSet pendingAcks = subscription.getValue();
if (pendingAcks != null && !pendingAcks.isEmpty()) {
Long lastPendingAck = pendingAcks.getTail().getLast();
for(Long sequenceId : pendingAcks) {
for (Long sequenceId : pendingAcks) {
Long current = rc.messageReferences.get(sequenceId);
if (current == null) {
current = new Long(0);
@ -2163,6 +2161,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// so we need to ensure we don't count that as a message reference on reload.
if (!sequenceId.equals(lastPendingAck)) {
current = current.longValue() + 1;
} else {
current = Long.valueOf(0L);
}
rc.messageReferences.put(sequenceId, current);
@ -2235,8 +2235,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
for (Long ackPosition : allOutstanding) {
Long count = sd.messageReferences.get(ackPosition);
count = count.longValue() + 1;
sd.messageReferences.put(ackPosition, count);
// There might not be a reference if the ackLocation was the last
// one which is a placeholder for the next incoming message and
// no value was added to the message references table.
if (count != null) {
count = count.longValue() + 1;
sd.messageReferences.put(ackPosition, count);
}
}
}
@ -2259,7 +2265,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
count = count.longValue() + 1;
sd.messageReferences.put(messageSequence, count);
sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
}
}
@ -2322,8 +2328,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Check if the message is reference by any other subscription.
Long count = sd.messageReferences.get(messageSequence);
if (count != null){
long references = count.longValue() - 1;
if (count != null) {
long references = count.longValue() - 1;
if (references > 0) {
sd.messageReferences.put(messageSequence, Long.valueOf(references));
return;
@ -3050,7 +3056,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
Iterator<Entry<Long, MessageKeys>>currentIterator;
final Iterator<Entry<Long, MessageKeys>>highIterator;
@ -3145,7 +3150,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void remove() {
throw new UnsupportedOperationException();
}
}
}
@ -3209,5 +3213,4 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void setPreallocationStrategy(String preallocationStrategy) {
this.preallocationStrategy = preallocationStrategy;
}
}

View File

@ -68,7 +68,7 @@ public class MQTTMaxFrameSizeTest extends MQTTTestSupport {
LOG.debug("Starting test on connector {} for frame size: {}", getProtocolScheme(), maxFrameSize);
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(getName());
mqtt.setClientId(getTestName());
mqtt.setKeepAlive((short) 10);
mqtt.setVersion("3.1.1");
@ -97,7 +97,7 @@ public class MQTTMaxFrameSizeTest extends MQTTTestSupport {
LOG.debug("Starting test on connector {} for frame size: {}", getProtocolScheme(), maxFrameSize);
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(getName());
mqtt.setClientId(getTestName());
mqtt.setKeepAlive((short) 10);
mqtt.setVersion("3.1.1");

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class MQTTOverlapedSubscriptionsTest {
private BrokerService brokerService;
private String mqttClientUrl;
@Before
public void setup() throws Exception {
initializeBroker(true);
}
@After
public void shutdown() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
}
protected void initializeBroker(boolean deleteAllMessagesOnStart) throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(true);
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStart);
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("mqtt://localhost:0"));
connector.setName("mqtt");
brokerService.addConnector(connector);
brokerService.start();
brokerService.waitUntilStarted();
mqttClientUrl = connector.getPublishableConnectString().replace("mqtt", "tcp");
}
@Test
public void testMqttResubscribe() throws Exception {
// inactive durable consumer on test/1 will be left on the broker after restart
doTest("test/1");
shutdown();
initializeBroker(false);
// new consumer on test/# will match all messages sent to the inactive sub
doTest("test/#");
}
private BlockingConnection getConnection(String host, String clientId) throws URISyntaxException, Exception {
BlockingConnection conn;
MQTT mqttPub = new MQTT();
mqttPub.setHost(host);
mqttPub.setConnectAttemptsMax(0);
mqttPub.setReconnectAttemptsMax(0);
mqttPub.setClientId(clientId);
mqttPub.setCleanSession(false);
conn = mqttPub.blockingConnection();
conn.connect();
return conn;
}
public void doTest(String subscribe) throws Exception {
String payload = "This is test payload";
BlockingConnection connectionPub = getConnection(mqttClientUrl, "client1");
BlockingConnection connectionSub = getConnection(mqttClientUrl, "client2");
Topic[] topics = { new Topic(subscribe, QoS.values()[1]) };
connectionSub.subscribe(topics);
connectionPub.publish("test/1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
receive(connectionSub, 3000);
//Unsubscribe and resubscribe
connectionSub.unsubscribe(new String[]{subscribe});
connectionSub.subscribe(topics);
connectionPub.publish(subscribe, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
receive(connectionSub, 3000);
connectionPub.disconnect();
connectionSub.disconnect();
}
public byte[] receive(BlockingConnection connection, int timeout) throws Exception {
byte[] result = null;
org.fusesource.mqtt.client.Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
if (message != null) {
result = message.getPayload();
message.ack();
}
return result;
}
}

View File

@ -72,7 +72,7 @@ public class MQTTSubscriptionRecoveryTest extends MQTTTestSupport {
@Test
public void testDurableSubscriptionsAreRecovered() throws Exception {
MqttClient connection = createClient(getName());
MqttClient connection = createClient(getTestName());
final String[] topics = { "TopicA/", "TopicB/", "TopicC/" };
for (int i = 0; i < topics.length; i++) {
@ -90,7 +90,7 @@ public class MQTTSubscriptionRecoveryTest extends MQTTTestSupport {
assertStatsForDisconnectedClient(topics.length);
connection = createClient(getName());
connection = createClient(getTestName());
assertStatsForConnectedClient(topics.length);
}

View File

@ -44,6 +44,7 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
@ -59,6 +60,8 @@ public class MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
public static final String KAHADB_DIRECTORY = "target/activemq-data/";
protected BrokerService brokerService;
protected int port;
protected String jmsUri = "vm://localhost";
@ -90,7 +93,7 @@ public class MQTTTestSupport {
this.useSSL = useSSL;
}
public String getName() {
public String getTestName() {
return name.getMethodName();
}
@ -144,6 +147,11 @@ public class MQTTTestSupport {
BrokerService brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
brokerService.setPersistent(isPersistent());
if (isPersistent()) {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
brokerService.setPersistenceAdapter(kaha);
}
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);