resolve issue with kahadb durable subs with selectors after restart, persist the ack locations, kahadb version to 3 with auto upgrade from 1 or 2. https://issues.apache.org/activemq/browse/AMQ-2985

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1036524 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-11-18 16:44:50 +00:00
parent fe3660dfcd
commit 3f0cf98407
8 changed files with 256 additions and 85 deletions

View File

@ -53,8 +53,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final synchronized void start() throws Exception{
if (!isStarted()) {
super.start();
clear();
super.start();
resetBatch();
this.size = getStoreSize();
this.storeHasMessages=this.size > 0;

View File

@ -928,7 +928,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
ActiveMQDestination dest = convert(entry.getKey());
if (dest.isTopic()) {
StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
if (loadedStore.ackPositions.isEmpty()) {
if (loadedStore.ackPositions.isEmpty(tx)) {
isEmptyTopic = true;
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.store.kahadb;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
@ -100,7 +101,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
static final long NOT_ACKED = -1;
static final long UNMATCHED_SEQ = -2;
static final int VERSION = 2;
static final int VERSION = 3;
protected class Metadata {
@ -165,9 +166,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} else {
os.writeBoolean(false);
}
if (version > 1) {
os.writeInt(version);
}
os.writeInt(VERSION);
}
}
@ -255,8 +254,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
metadata.destinations.load(tx);
}
});
pageFile.flush();
// Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
// Perhaps we should just keep an index of file
storedDestinations.clear();
@ -269,6 +266,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
}
});
pageFile.flush();
}finally {
this.indexLock.writeLock().unlock();
}
@ -985,7 +983,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
// Skip adding the message to the index if this is a topic and there are
// no subscriptions.
if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
if (sd.subscriptions != null && sd.ackPositions.isEmpty(tx)) {
return;
}
@ -1055,18 +1053,19 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore);
if (ackSequenceToStore != sequence) {
// unmatched, need to add ack locations for the intermediate sequences
for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence < sequence; matchedGapSequence++) {
addAckLocation(sd, matchedGapSequence, subscriptionKey);
if (prev != null) {
if (ackSequenceToStore != sequence) {
// unmatched, need to add ack locations for the intermediate sequences
for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence < sequence; matchedGapSequence++) {
addAckLocation(tx, sd, matchedGapSequence, subscriptionKey);
}
}
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
}
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
// Add it to the new location set.
addAckLocation(sd, sequence, subscriptionKey);
addAckLocation(tx, sd, sequence, subscriptionKey);
}
}
@ -1107,6 +1106,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
sd.subscriptionAcks.clear(tx);
sd.subscriptionAcks.unload(tx);
tx.free(sd.subscriptionAcks.getPageId());
sd.ackPositions.clear(tx);
sd.ackPositions.unload(tx);
tx.free(sd.ackPositions.getPageId());
}
String key = key(command.getDestination());
@ -1127,7 +1130,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
addAckLocation(sd, ackLocation, subscriptionKey);
addAckLocation(tx, sd, ackLocation, subscriptionKey);
} else {
// delete the sub...
String subscriptionKey = command.getSubscriptionKey();
@ -1326,13 +1329,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
BTreeIndex<String, Long> subscriptionAcks;
HashMap<String, MessageOrderCursor> subscriptionCursors;
TreeMap<Long, HashSet<String>> ackPositions;
BTreeIndex<Long, HashSet<String>> ackPositions;
}
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
public StoredDestination readPayload(DataInput dataIn) throws IOException {
StoredDestination value = new StoredDestination();
final StoredDestination value = new StoredDestination();
value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
@ -1340,11 +1343,40 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (dataIn.readBoolean()) {
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
if (metadata.version >= 3) {
value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
} else {
// upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
value.ackPositions.load(tx);
}
});
}
}
if (metadata.version >= 2) {
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
} else {
// upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
value.orderIndex.lowPriorityIndex.load(tx);
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
value.orderIndex.highPriorityIndex.load(tx);
}
});
}
return value;
}
@ -1356,13 +1388,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
dataOut.writeBoolean(true);
dataOut.writeLong(value.subscriptions.getPageId());
dataOut.writeLong(value.subscriptionAcks.getPageId());
dataOut.writeLong(value.ackPositions.getPageId());
} else {
dataOut.writeBoolean(false);
}
if (metadata.version >= 2) {
dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
}
dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
}
}
@ -1452,6 +1483,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
if (topic) {
rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
}
metadata.destinations.put(tx, key, rc);
}
@ -1481,18 +1513,24 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
rc.subscriptionAcks.load(tx);
rc.ackPositions = new TreeMap<Long, HashSet<String>>();
rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
rc.ackPositions.load(tx);
rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
Entry<String, Long> entry = iterator.next();
addAckLocation(rc, extractSequenceId(entry.getValue()), entry.getKey());
if (metadata.version < 3) {
// on upgrade need to fill ackLocation
for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
Entry<String, Long> entry = iterator.next();
addAckLocation(tx, rc, extractSequenceId(entry.getValue()), entry.getKey());
}
}
if (rc.orderIndex.nextMessageId == 0) {
// check for existing durable sub all acked out - pull next seq from acks as messages are gone
if (!rc.ackPositions.isEmpty()) {
Long lastAckedMessageId = rc.ackPositions.lastKey();
if (!rc.ackPositions.isEmpty(tx)) {
Long lastAckedMessageId = rc.ackPositions.getLast(tx).getKey();
if (lastAckedMessageId != NOT_ACKED) {
rc.orderIndex.nextMessageId = lastAckedMessageId+1;
}
@ -1500,6 +1538,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
}
if (metadata.version < 3) {
// store again after upgrade
metadata.destinations.put(tx, key, rc);
}
return rc;
}
@ -1508,13 +1551,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
* @param messageSequence
* @param subscriptionKey
*/
private void addAckLocation(StoredDestination sd, Long messageSequence, String subscriptionKey) {
HashSet<String> hs = sd.ackPositions.get(messageSequence);
private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
HashSet<String> hs = sd.ackPositions.get(tx, messageSequence);
if (hs == null) {
hs = new HashSet<String>();
sd.ackPositions.put(messageSequence, hs);
}
hs.add(subscriptionKey);
// every ack location addition needs to be a btree modification to get it stored
sd.ackPositions.put(tx, messageSequence, hs);
}
/**
@ -1527,12 +1571,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
// Remove the sub from the previous location set..
if (sequenceId != null) {
HashSet<String> hs = sd.ackPositions.get(sequenceId);
HashSet<String> hs = sd.ackPositions.get(tx, sequenceId);
if (hs != null) {
hs.remove(subscriptionKey);
if (hs.isEmpty()) {
HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
sd.ackPositions.remove(sequenceId);
HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue();
sd.ackPositions.remove(tx, sequenceId);
// Did we just empty out the first set in the
// ordered list of ack locations? Then it's time to
@ -1942,15 +1986,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
defaultPriorityIndex.load(tx);
if (metadata.version >= 2) {
lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
lowPriorityIndex.load(tx);
highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
highPriorityIndex.load(tx);
}
lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
lowPriorityIndex.load(tx);
highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
highPriorityIndex.load(tx);
}
void allocate(Transaction tx) throws IOException {
@ -2193,6 +2234,33 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
}
}
private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(baos);
oout.writeObject(object);
oout.flush();
oout.close();
byte[] data = baos.toByteArray();
dataOut.writeInt(data.length);
dataOut.write(data);
}
public HashSet<String> readPayload(DataInput dataIn) throws IOException {
int dataLen = dataIn.readInt();
byte[] data = new byte[dataLen];
dataIn.readFully(data);
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream oin = new ObjectInputStream(bais);
try {
return (HashSet<String>) oin.readObject();
} catch (ClassNotFoundException cfe) {
IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
ioe.initCause(cfe);
throw ioe;
}
}
}
}

View File

@ -34,6 +34,7 @@ import java.io.FileNotFoundException;
public class KahaDBVersionTest extends TestCase {
final static File VERSION_1_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
final static File VERSION_2_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception {
BrokerService broker = new BrokerService();
@ -47,21 +48,28 @@ public class KahaDBVersionTest extends TestCase {
public void XtestCreateStore() throws Exception {
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersionX");
IOHelper.deleteFile(dir);
kaha.setDirectory(dir);
kaha.setJournalMaxFileLength(1024*1024);
BrokerService broker = createBroker(kaha);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection();
connection.setClientID("test");
connection.start();
producerSomeMessages(connection);
connection.close();
broker.stop();
}
private void producerSomeMessages(Connection connection) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
Queue queue = session.createQueue("test.queue");
MessageConsumer consumer = session.createDurableSubscriber(topic,"test");
consumer.close();
MessageProducer producer = session.createProducer(topic);
producer.setPriority(9);
for (int i =0; i < 1000; i++) {
Message msg = session.createTextMessage("test message:"+i);
producer.send(msg);
@ -71,45 +79,56 @@ public class KahaDBVersionTest extends TestCase {
Message msg = session.createTextMessage("test message:"+i);
producer.send(msg);
}
connection.stop();
broker.stop();
}
public void testVersionConversion() throws Exception{
public void testVersion1Conversion() throws Exception{
doConvertRestartCycle(VERSION_1_DB);
}
public void testVersion2Conversion() throws Exception{
doConvertRestartCycle(VERSION_2_DB);
}
public void doConvertRestartCycle(File existingStore) throws Exception {
File testDir = new File("target/activemq-data/kahadb/versionDB");
IOHelper.deleteFile(testDir);
IOHelper.copyFile(VERSION_1_DB, testDir);
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
kaha.setDirectory(testDir);
kaha.setJournalMaxFileLength(1024*1024);
BrokerService broker = createBroker(kaha);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection();
connection.setClientID("test");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
Queue queue = session.createQueue("test.queue");
MessageConsumer queueConsumer = session.createConsumer(queue);
for (int i = 0; i < 1000; i++) {
TextMessage msg = (TextMessage) queueConsumer.receive(10000);
//System.err.println(msg.getText());
assertNotNull(msg);
IOHelper.copyFile(existingStore, testDir);
// on repeat store will be upgraded
for (int repeats = 0; repeats < 3; repeats++) {
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
kaha.setDirectory(testDir);
kaha.setJournalMaxFileLength(1024 * 1024);
BrokerService broker = createBroker(kaha);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = cf.createConnection();
connection.setClientID("test");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test.topic");
Queue queue = session.createQueue("test.queue");
if (repeats > 0) {
// upgraded store will be empty so generated some more messages
producerSomeMessages(connection);
}
MessageConsumer queueConsumer = session.createConsumer(queue);
for (int i = 0; i < 1000; i++) {
TextMessage msg = (TextMessage) queueConsumer.receive(10000);
//System.err.println(msg.getText());
assertNotNull(msg);
}
MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test");
for (int i = 0; i < 1000; i++) {
TextMessage msg = (TextMessage) topicConsumer.receive(10000);
//System.err.println(msg.getText());
assertNotNull(msg);
}
connection.close();
broker.stop();
}
MessageConsumer topicConsumer = session.createDurableSubscriber(topic,"test");
for (int i = 0; i < 1000; i++) {
TextMessage msg = (TextMessage) topicConsumer.receive(10000);
//System.err.println(msg.getText());
assertNotNull(msg);
}
broker.stop();
}
}

View File

@ -617,6 +617,90 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(0, listener.count);
}
public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
// create offline subs 1
Connection con = createConnection("offCli1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close();
con.close();
// create offline subs 2
con = createConnection("offCli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
session.close();
con.close();
// send messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
int filtered = 0;
for (int i = 0; i < 10; i++) {
boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
if (filter)
filtered++;
Message message = session.createMessage();
message.setStringProperty("filter", filter ? "true" : "false");
producer.send(topic, message);
}
LOG.info("sent: " + filtered);
Thread.sleep(1 * 1000);
session.close();
con.close();
// restart broker
Thread.sleep(3 * 1000);
broker.stop();
createBroker(false /*deleteAllMessages*/);
// send more messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
for (int i = 0; i < 10; i++) {
boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
if (filter)
filtered++;
Message message = session.createMessage();
message.setStringProperty("filter", filter ? "true" : "false");
producer.send(topic, message);
}
LOG.info("after restart, sent: " + filtered);
Thread.sleep(1 * 1000);
session.close();
con.close();
// test offline subs
con = createConnection("offCli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener = new Listener();
consumer.setMessageListener(listener);
Connection con3 = createConnection("offCli2");
Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
Listener listener3 = new Listener();
consumer3.setMessageListener(listener3);
Thread.sleep(3 * 1000);
session.close();
con.close();
session3.close();
con3.close();
assertEquals(filtered, listener.count);
assertEquals(filtered, listener3.count);
}
public static class Listener implements MessageListener {
int count = 0;