mirror of https://github.com/apache/activemq.git
AMQ-6642: Fix potential NPE on updateMessage
Fixing potential NPE when calling updateMessage on a Queue store in KahaDB if subscription statistics are enabled. Also reduced the visibily from public to protected to subscription statistic related methods that shouldn't be public and added null pointer checks there as well. https://issues.apache.org/jira/browse/AMQ-6642
This commit is contained in:
parent
4ef1fc74cf
commit
2731f04f1c
|
@ -1524,7 +1524,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
|
decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
|
||||||
|
|
||||||
//update all the subscription metrics
|
//update all the subscription metrics
|
||||||
if (enableSubscriptionStatistics && location.getSize() != previousKeys.location.getSize()) {
|
if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) {
|
||||||
Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx);
|
Iterator<Entry<String, SequenceSet>> iter = sd.ackPositions.iterator(tx);
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
Entry<String, SequenceSet> e = iter.next();
|
Entry<String, SequenceSet> e = iter.next();
|
||||||
|
@ -2970,22 +2970,26 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
return sd.subscriptionAcks.get(tx, subscriptionKey);
|
return sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
|
protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
|
||||||
|
if (sd.ackPositions != null) {
|
||||||
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
|
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
|
||||||
if (messageSequences != null) {
|
if (messageSequences != null) {
|
||||||
long result = messageSequences.rangeSize();
|
long result = messageSequences.rangeSize();
|
||||||
// if there's anything in the range the last value is always the nextMessage marker, so remove 1.
|
// if there's anything in the range the last value is always the nextMessage marker, so remove 1.
|
||||||
return result > 0 ? result - 1 : 0;
|
return result > 0 ? result - 1 : 0;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
|
protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
|
||||||
|
long locationSize = 0;
|
||||||
|
|
||||||
|
if (sd.ackPositions != null) {
|
||||||
//grab the messages attached to this subscription
|
//grab the messages attached to this subscription
|
||||||
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
|
SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
|
||||||
|
|
||||||
long locationSize = 0;
|
|
||||||
if (messageSequences != null) {
|
if (messageSequences != null) {
|
||||||
Sequence head = messageSequences.getHead();
|
Sequence head = messageSequences.getHead();
|
||||||
if (head != null) {
|
if (head != null) {
|
||||||
|
@ -3000,6 +3004,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return locationSize;
|
return locationSize;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,12 @@
|
||||||
package org.apache.activemq.store.kahadb;
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
@ -36,13 +39,27 @@ import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class MessageDatabaseSizeTest {
|
public class MessageDatabaseSizeTest {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MessageDatabaseSizeTest.class);
|
private static final Logger LOG = LoggerFactory.getLogger(MessageDatabaseSizeTest.class);
|
||||||
|
|
||||||
|
@Parameters(name = "subStatsEnabled={0}")
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
// Subscription stats on
|
||||||
|
{true},
|
||||||
|
// Subscription stats off
|
||||||
|
{false}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TemporaryFolder dataDir = new TemporaryFolder(new File("target"));
|
public TemporaryFolder dataDir = new TemporaryFolder(new File("target"));
|
||||||
private final String payload = new String(new byte[1024]);
|
private final String payload = new String(new byte[1024]);
|
||||||
|
@ -50,6 +67,13 @@ public class MessageDatabaseSizeTest {
|
||||||
private BrokerService broker = null;
|
private BrokerService broker = null;
|
||||||
private final ActiveMQQueue destination = new ActiveMQQueue("Test");
|
private final ActiveMQQueue destination = new ActiveMQQueue("Test");
|
||||||
private KahaDBPersistenceAdapter adapter;
|
private KahaDBPersistenceAdapter adapter;
|
||||||
|
private boolean subStatsEnabled;
|
||||||
|
|
||||||
|
public MessageDatabaseSizeTest(boolean subStatsEnabled) {
|
||||||
|
super();
|
||||||
|
this.subStatsEnabled = subStatsEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected void startBroker() throws Exception {
|
protected void startBroker() throws Exception {
|
||||||
broker = new BrokerService();
|
broker = new BrokerService();
|
||||||
|
@ -58,6 +82,7 @@ public class MessageDatabaseSizeTest {
|
||||||
broker.setUseJmx(true);
|
broker.setUseJmx(true);
|
||||||
broker.setDataDirectory(dataDir.getRoot().getAbsolutePath());
|
broker.setDataDirectory(dataDir.getRoot().getAbsolutePath());
|
||||||
adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
|
adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
|
||||||
|
adapter.setEnableSubscriptionStatistics(subStatsEnabled);
|
||||||
broker.start();
|
broker.start();
|
||||||
LOG.info("Starting broker..");
|
LOG.info("Starting broker..");
|
||||||
}
|
}
|
||||||
|
@ -101,6 +126,22 @@ public class MessageDatabaseSizeTest {
|
||||||
assertEquals(existingSize, messageStore.getMessageSize());
|
assertEquals(existingSize, messageStore.getMessageSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateMessageSameLocationDifferentSize() throws Exception {
|
||||||
|
final KahaDBStore store = adapter.getStore();
|
||||||
|
MessageId messageId = new MessageId("111:222:333");
|
||||||
|
ActiveMQTextMessage textMessage = getMessage(new MessageId("111:222:333"));
|
||||||
|
|
||||||
|
//Add a single message and update once so we can compare the size consistently
|
||||||
|
MessageStore messageStore = store.createQueueMessageStore(destination);
|
||||||
|
messageStore.addMessage(broker.getAdminConnectionContext(), textMessage);
|
||||||
|
textMessage.setText("new size of message");
|
||||||
|
messageStore.updateMessage(textMessage);
|
||||||
|
|
||||||
|
assertNotNull(findMessageLocation(messageId.toString(), store.convert(destination)));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that when updating an existing message to a different location in the
|
* Test that when updating an existing message to a different location in the
|
||||||
* journal that the index size doesn't change
|
* journal that the index size doesn't change
|
||||||
|
|
Loading…
Reference in New Issue