From 3bf9d0ccebbffc6a726e6251e78b88d5c240e58f Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Mon, 15 Apr 2013 18:08:15 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-4212 fix for: https://issues.apache.org/jira/browse/AMQ-2832 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1468171 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/kahadb/MessageDatabase.java | 226 +++++++++-- .../apache/activemq/store/kahadb/Visitor.java | 5 +- .../src/main/proto/journal-data.proto | 23 +- .../org/apache/activemq/bugs/AMQ2832Test.java | 182 +++++++-- .../org/apache/activemq/bugs/AMQ4212Test.java | 358 ++++++++++++++++++ .../store/kahadb/KahaDBVersionTest.java | 79 ++-- ...SelectorConcurrentConsumeIndexUseTest.java | 14 +- 7 files changed, 783 insertions(+), 104 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 047c9e337d..8fd78feda9 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -54,9 +54,9 @@ import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.protobuf.Buffer; +import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; @@ -85,7 +85,12 @@ import org.apache.activemq.store.kahadb.disk.util.Sequence; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; -import org.apache.activemq.util.*; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.DataByteArrayInputStream; +import org.apache.activemq.util.DataByteArrayOutputStream; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,13 +106,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe UNMATCHED = new Buffer(new byte[]{}); } private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); - private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; static final int CLOSED_STATE = 1; static final int OPEN_STATE = 2; static final long NOT_ACKED = -1; - static final int VERSION = 4; + static final int VERSION = 5; protected class Metadata { protected Page page; @@ -116,7 +120,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected Location lastUpdate; protected Location firstInProgressTransactionLocation; protected Location producerSequenceIdTrackerLocation = null; + protected Location ackMessageFileMapLocation = null; protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); + protected transient Map> ackMessageFileMap = new HashMap>(); protected int version = VERSION; public void read(DataInput is) throws IOException { state = is.readInt(); @@ -144,6 +150,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } catch (EOFException expectedOnUpgrade) { version=1; } + if (version >= 5 && is.readBoolean()) { + ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); + } else { + ackMessageFileMapLocation = null; + } LOG.info("KahaDB is version " + version); } @@ -172,6 +183,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe os.writeBoolean(false); } os.writeInt(VERSION); + if (ackMessageFileMapLocation != null) { + os.writeBoolean(true); + LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); + } else { + os.writeBoolean(false); + } } } @@ -452,6 +469,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return range; } + @SuppressWarnings("rawtypes") private void trackMaxAndMin(Location[] range, List ops) { Location t = ops.get(0).getLocation(); if (range[0]==null || t.compareTo(range[0]) <= 0) { @@ -473,6 +491,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } HashMap destinationOpCount = new HashMap(); + @SuppressWarnings("rawtypes") public void track(Operation operation) { if (location == null ) { location = operation.getLocation(); @@ -543,9 +562,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe long start = System.currentTimeMillis(); Location producerAuditPosition = recoverProducerAudit(); + Location ackMessageFileLocation = recoverAckMessageFileMap(); Location lastIndoubtPosition = getRecoveryPosition(); - Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition); + Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation); + recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); if (recoveryPosition != null) { int redoCounter = 0; @@ -631,6 +652,24 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + @SuppressWarnings("unchecked") + private Location recoverAckMessageFileMap() throws IOException { + if (metadata.ackMessageFileMapLocation != null) { + KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); + try { + ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); + metadata.ackMessageFileMap = (Map>) objectIn.readObject(); + return journal.getNextLocation(metadata.ackMessageFileMapLocation); + } catch (Exception e) { + LOG.warn("Cannot recover ackMessageFileMap", e); + return journal.getNextLocation(null); + } + } else { + // got no ackMessageFileMap stored so got to recreate via replay from start of the journal + return journal.getNextLocation(null); + } + } + protected void recoverIndex(Transaction tx) throws IOException { long start = System.currentTimeMillis(); // It is possible index updates got applied before the journal updates.. @@ -760,7 +799,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe undoCounter++; // TODO: do we need to modify the ack positions for the pub sub case? } - } else { throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected."); } @@ -947,6 +985,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe byte readByte = is.readByte(); KahaEntryType type = KahaEntryType.valueOf(readByte); if( type == null ) { + try { + is.close(); + } catch (IOException e) {} throw new IOException("Could not load journal record. Invalid location: "+location); } JournalCommand message = (JournalCommand)type.createMessage(); @@ -1023,6 +1064,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe processLocation(location); } + @Override + public void visit(KahaAckMessageFileMapCommand command) throws IOException { + processLocation(location); + } + @Override public void visit(KahaTraceCommand command) { processLocation(location); @@ -1223,20 +1269,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } else { // If the message ID as indexed, then the broker asked us to - // store a DUP - // message. Bad BOY! Don't do it, and log a warning. + // store a DUP message. Bad BOY! Don't do it, and log a warning. LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId()); sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.locationIndex.remove(tx, location); rollbackStatsOnDuplicate(command.getDestination()); } } else { - // restore the previous value.. Looks like this was a redo of a - // previously - // added message. We don't want to assign it a new id as the other - // indexes would + // restore the previous value.. Looks like this was a redo of a previously + // added message. We don't want to assign it a new id as the other indexes would // be wrong.. - // sd.locationIndex.put(tx, location, previous); } // record this id in any event, initial send or recovery @@ -1276,6 +1318,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe byte priority = sd.orderIndex.lastGetPriority(); sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); } + + MessageKeys keys = sd.orderIndex.get(tx, sequence); + if (keys != null) { + recordAckMessageReferenceLocation(ackLocation, keys.location); + } // The following method handles deleting un-referenced messages. removeAckLocation(tx, sd, subscriptionKey, sequence); } else if (LOG.isDebugEnabled()) { @@ -1286,13 +1333,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.lastUpdate = ackLocation; } - Map> ackMessageFileMap = new HashMap>(); private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { - Set referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); + Set referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); if (referenceFileIds == null) { referenceFileIds = new HashSet(); referenceFileIds.add(messageLocation.getDataFileId()); - ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); + metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); } else { Integer id = Integer.valueOf(messageLocation.getDataFileId()); if (!referenceFileIds.contains(id)) { @@ -1325,6 +1371,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.ackPositions.clear(tx); sd.ackPositions.unload(tx); tx.free(sd.ackPositions.getHeadPageId()); + + sd.subLocations.clear(tx); + sd.subLocations.unload(tx); + tx.free(sd.subLocations.getHeadPageId()); } String key = key(command.getDestination()); @@ -1339,6 +1389,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // If set then we are creating it.. otherwise we are destroying the sub if (command.hasSubscriptionInfo()) { sd.subscriptions.put(tx, subscriptionKey, command); + sd.subLocations.put(tx, subscriptionKey, location); long ackLocation=NOT_ACKED; if (!command.getRetroactive()) { ackLocation = sd.orderIndex.nextMessageId-1; @@ -1350,6 +1401,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } else { // delete the sub... sd.subscriptions.remove(tx, subscriptionKey); + sd.subLocations.remove(tx, subscriptionKey); sd.subscriptionAcks.remove(tx, subscriptionKey); sd.subscriptionCache.remove(subscriptionKey); removeAckLocationsForSub(tx, sd, subscriptionKey); @@ -1394,6 +1446,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.state = OPEN_STATE; metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); + metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); Location[] inProgressTxRange = getInProgressTxLocationRange(); metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; tx.store(metadata.page, metadataMarshaller, true); @@ -1432,6 +1485,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + if (metadata.ackMessageFileMapLocation != null) { + int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); + gcCandidateSet.remove(dataFileId); + if (LOG.isTraceEnabled()) { + LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet); + } + } + // Don't GC files referenced by in-progress tx if (inProgressTxRange[0] != null) { for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { @@ -1488,6 +1549,45 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } }); + + // Durable Subscription + if (entry.getValue().subLocations != null) { + Iterator> iter = entry.getValue().subLocations.iterator(tx); + while (iter.hasNext()) { + Entry subscription = iter.next(); + int dataFileId = subscription.getValue().getDataFileId(); + + // Move subscription along if it has no outstanding messages that need ack'd + // and its in the last log file in the journal. + if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { + final StoredDestination destination = entry.getValue(); + final String subscriptionKey = subscription.getKey(); + SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); + + // When pending is size one that is the next message Id meaning there + // are no pending messages currently. + if (pendingAcks == null || pendingAcks.size() <= 1) { + if (LOG.isTraceEnabled()) { + LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId); + } + + final KahaSubscriptionCommand kahaSub = + destination.subscriptions.get(tx, subscriptionKey); + destination.subLocations.put( + tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); + + // Skips the remove from candidates if we rewrote the subscription + // in order to prevent duplicate subscription commands on recover. + // If another subscription is on the same file and isn't rewritten + // than it will remove the file from the set. + continue; + } + } + + gcCandidateSet.remove(dataFileId); + } + } + if (LOG.isTraceEnabled()) { LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); } @@ -1501,7 +1601,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Iterator candidates = gcCandidateSet.iterator(); while (candidates.hasNext()) { Integer candidate = candidates.next(); - Set referencedFileIds = ackMessageFileMap.get(candidate); + Set referencedFileIds = metadata.ackMessageFileMap.get(candidate); if (referencedFileIds != null) { for (Integer referencedFileId : referencedFileIds) { if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) { @@ -1511,7 +1611,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } if (gcCandidateSet.contains(candidate)) { - ackMessageFileMap.remove(candidate); + metadata.ackMessageFileMap.remove(candidate); } else { if (LOG.isTraceEnabled()) { LOG.trace("not removing data file: " + candidate @@ -1537,6 +1637,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe public void run() { } }; + private Location checkpointProducerAudit() throws IOException { if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -1556,6 +1657,35 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return metadata.producerSequenceIdTrackerLocation; } + private Location checkpointAckMessageFileMap() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oout = new ObjectOutputStream(baos); + oout.writeObject(metadata.ackMessageFileMap); + oout.flush(); + oout.close(); + // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false + Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); + try { + location.getLatch().await(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.toString()); + } + return location; + } + + private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { + + ByteSequence sequence = toByteSequence(subscription); + Location location = journal.write(sequence, nullCompletionCallback) ; + + try { + location.getLatch().await(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.toString()); + } + return location; + } + public HashSet getJournalFilesBeingReplicated() { return journalFilesBeingReplicated; } @@ -1566,13 +1696,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private final HashMap storedDestinations = new HashMap(); - class StoredSubscription { - SubscriptionInfo subscriptionInfo; - String lastAckId; - Location lastAckLocation; - Location cursor; - } - static class MessageKeys { final String messageId; final Location location; @@ -1677,6 +1800,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe BTreeIndex subscriptionAcks; HashMap subscriptionCursors; ListIndex ackPositions; + ListIndex subLocations; // Transient data used to track which Messages are no longer needed. final TreeMap messageReferences = new TreeMap(); @@ -1730,6 +1854,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Now move the pending messages to ack data into the store backed // structure. value.ackPositions = new ListIndex(pageFile, tx.allocate()); + value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); + value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); + value.ackPositions.load(tx); for(String subscriptionKey : temp.keySet()) { value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); } @@ -1737,26 +1864,41 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } }); } + + if (metadata.version >= 5) { + value.subLocations = new ListIndex(pageFile, dataIn.readLong()); + } else { + // upgrade + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws IOException { + value.subLocations = new ListIndex(pageFile, tx.allocate()); + value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); + value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); + value.subLocations.load(tx); + } + }); + } } if (metadata.version >= 2) { value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, dataIn.readLong()); } else { - // upgrade - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); - value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); - value.orderIndex.lowPriorityIndex.load(tx); + // upgrade + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws IOException { + value.orderIndex.lowPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + value.orderIndex.lowPriorityIndex.load(tx); - value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); - value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); - value.orderIndex.highPriorityIndex.load(tx); - } - }); + value.orderIndex.highPriorityIndex = new BTreeIndex(pageFile, tx.allocate()); + value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + value.orderIndex.highPriorityIndex.load(tx); + } + }); } return value; @@ -1772,6 +1914,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe dataOut.writeLong(value.subscriptions.getPageId()); dataOut.writeLong(value.subscriptionAcks.getPageId()); dataOut.writeLong(value.ackPositions.getHeadPageId()); + dataOut.writeLong(value.subLocations.getHeadPageId()); } else { dataOut.writeBoolean(false); } @@ -1840,6 +1983,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe rc.subscriptions = new BTreeIndex(pageFile, tx.allocate()); rc.subscriptionAcks = new BTreeIndex(pageFile, tx.allocate()); rc.ackPositions = new ListIndex(pageFile, tx.allocate()); + rc.subLocations = new ListIndex(pageFile, tx.allocate()); } metadata.destinations.put(tx, key, rc); } @@ -1873,6 +2017,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); rc.ackPositions.load(tx); + rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); + rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); + rc.subLocations.load(tx); + rc.subscriptionCursors = new HashMap(); if (metadata.version < 3) { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java index 544148a520..03072fe612 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java @@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb; import java.io.IOException; +import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; @@ -53,8 +54,10 @@ public class Visitor { public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException { } - + public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException { } + public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException { + } } diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto index a95d940841..9d263087d9 100644 --- a/activemq-kahadb-store/src/main/proto/journal-data.proto +++ b/activemq-kahadb-store/src/main/proto/journal-data.proto @@ -30,6 +30,7 @@ enum KahaEntryType { KAHA_REMOVE_DESTINATION_COMMAND = 6; KAHA_SUBSCRIPTION_COMMAND = 7; KAHA_PRODUCER_AUDIT_COMMAND = 8; + KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9; } message KahaTraceCommand { @@ -40,7 +41,7 @@ message KahaTraceCommand { //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand"; //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; //| option java_type_method = "KahaEntryType"; - + required string message = 1; } @@ -48,7 +49,7 @@ message KahaAddMessageCommand { //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand"; //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; //| option java_type_method = "KahaEntryType"; - + optional KahaTransactionInfo transaction_info=1; required KahaDestination destination = 2; required string messageId = 3; @@ -120,10 +121,22 @@ message KahaProducerAuditCommand { //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand"; //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; //| option java_type_method = "KahaEntryType"; - + required bytes audit = 1; } +message KahaAckMessageFileMapCommand { + // We make use of the wonky comment style bellow because the following options + // are not valid for protoc, but they are valid for the ActiveMQ proto compiler. + // In the ActiveMQ proto compiler, comments terminate with the pipe character: | + + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required bytes ackMessageFileMap = 1; +} + message KahaDestination { enum DestinationType { QUEUE = 0; @@ -154,8 +167,8 @@ message KahaXATransactionId { } message KahaLocation { - required int32 log_id = 1; - required int32 offset = 2; + required int32 log_id = 1; + required int32 offset = 2; } // TODO things to ponder diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java index b89c32b518..965a5f0ea6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java @@ -16,14 +16,13 @@ */ package org.apache.activemq.bugs; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; @@ -31,51 +30,95 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; +import javax.jms.Topic; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQ2832Test { private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class); BrokerService broker = null; + private ActiveMQConnectionFactory cf; private final Destination destination = new ActiveMQQueue("AMQ2832Test"); + private String connectionUri; - protected void startBroker(boolean delete) throws Exception { + protected void startBroker() throws Exception { + doStartBroker(true, false); + } + + protected void restartBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + doStartBroker(false, false); + } + + protected void recoverBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + doStartBroker(false, true); + } + + private void doStartBroker(boolean delete, boolean recover) throws Exception { broker = new BrokerService(); broker.setDeleteAllMessagesOnStartup(delete); broker.setPersistent(true); - broker.setUseJmx(false); + broker.setUseJmx(true); broker.addConnector("tcp://localhost:0"); - configurePersistence(broker, delete); + configurePersistence(broker, recover); + + connectionUri = "vm://localhost?create=false"; + cf = new ActiveMQConnectionFactory(connectionUri); broker.start(); LOG.info("Starting broker.."); } - protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception { + protected void configurePersistence(BrokerService brokerService, boolean recover) throws Exception { KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); // ensure there are a bunch of data files but multiple entries in each adapter.setJournalMaxFileLength(1024 * 20); // speed up the test case, checkpoint an cleanup early and often - adapter.setCheckpointInterval(500); - adapter.setCleanupInterval(500); + adapter.setCheckpointInterval(5000); + adapter.setCleanupInterval(5000); - if (!deleteAllOnStart) { + if (recover) { adapter.setForceRecoverIndex(true); } + } + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } } @Test public void testAckRemovedMessageReplayedAfterRecovery() throws Exception { - startBroker(true); + startBroker(); StagedConsumer consumer = new StagedConsumer(); int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20); @@ -102,9 +145,9 @@ public class AMQ2832Test { broker.stop(); broker.waitUntilStopped(); - startBroker(false); + recoverBroker(); - consumer = new StagedConsumer(); + consumer = new StagedConsumer(); // need to force recovery? Message msg = consumer.receive(1, 5); @@ -115,10 +158,99 @@ public class AMQ2832Test { msg = consumer.receive(1, 5); assertEquals("Only one messages left after recovery: " + msg, null, msg); consumer.close(); - } - private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception { + @Test + public void testAlternateLossScenario() throws Exception { + + startBroker(); + + ActiveMQQueue queue = new ActiveMQQueue("MyQueue"); + ActiveMQQueue disposable = new ActiveMQQueue("MyDisposableQueue"); + ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic"); + + // This ensure that data file 1 never goes away. + createInactiveDurableSub(topic); + assertEquals(1, getNumberOfJournalFiles()); + + // One Queue Message that will be acked in another data file. + produceMessages(queue, 1); + assertEquals(1, getNumberOfJournalFiles()); + + // Add some messages to consume space + produceMessages(disposable, 50); + + int dataFilesCount = getNumberOfJournalFiles(); + assertTrue(dataFilesCount > 1); + + // Create an ack for the single message on this queue + drainQueue(queue); + + // Add some more messages to consume space beyond tha data file with the ack + produceMessages(disposable, 50); + + assertTrue(dataFilesCount < getNumberOfJournalFiles()); + dataFilesCount = getNumberOfJournalFiles(); + + restartBroker(); + + // Clear out all queue data + broker.getAdminView().removeQueue(disposable.getQueueName()); + + // Once this becomes true our ack could be lost. + assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getNumberOfJournalFiles() <= 3; + } + }, TimeUnit.MINUTES.toMillis(3))); + + // Recover and the Message should not be replayed but if the old MessageAck is lost + // then it could be. + recoverBroker(); + + assertTrue(drainQueue(queue) == 0); + } + + private int getNumberOfJournalFiles() throws IOException { + Collection files = + ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); + int reality = 0; + for (DataFile file : files) { + if (file != null) { + reality++; + } + } + + return reality; + } + + private void createInactiveDurableSub(Topic topic) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + consumer.close(); + connection.close(); + produceMessages(topic, 1); + } + + private int drainQueue(Queue queue) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + int count = 0; + while (consumer.receive(5000) != null) { + count++; + } + consumer.close(); + connection.close(); + return count; + } + + private int produceMessages(Destination destination, int numToSend) throws Exception { int sent = 0; Connection connection = new ActiveMQConnectionFactory( broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); @@ -133,10 +265,14 @@ public class AMQ2832Test { } finally { connection.close(); } - + return sent; } + private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception { + return produceMessages(destination, numToSend); + } + final String payload = new String(new byte[1024]); private Message createMessage(Session session, int i) throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java new file mode 100644 index 0000000000..141a8810f0 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4212Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class); + + private BrokerService service; + private String connectionUri; + private ActiveMQConnectionFactory cf; + + private final int MSG_COUNT = 256; + + @Before + public void setUp() throws IOException, Exception { + createBroker(true, false); + } + + public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception { + service = new BrokerService(); + service.setBrokerName("InactiveSubTest"); + service.setDeleteAllMessagesOnStartup(deleteAllMessages); + service.setAdvisorySupport(false); + service.setPersistent(true); + service.setUseJmx(true); + service.setKeepDurableSubsActive(false); + + KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter(); + File dataFile=new File("KahaDB"); + pa.setDirectory(dataFile); + pa.setJournalMaxFileLength(10*1024); + pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5)); + pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5)); + pa.setForceRecoverIndex(recover); + + service.setPersistenceAdapter(pa); + service.start(); + service.waitUntilStarted(); + + connectionUri = "vm://InactiveSubTest?create=false"; + cf = new ActiveMQConnectionFactory(connectionUri); + } + + private void restartBroker() throws Exception { + stopBroker(); + createBroker(false, false); + } + + private void recoverBroker() throws Exception { + stopBroker(); + createBroker(false, true); + } + + @After + public void stopBroker() throws Exception { + if (service != null) { + service.stop(); + service.waitUntilStopped(); + service = null; + } + } + + @Test + public void testDirableSubPrefetchRecovered() throws Exception { + + ActiveMQQueue queue = new ActiveMQQueue("MyQueue"); + ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic"); + + // Send to a Queue to create some journal files + sendMessages(queue); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + + createInactiveDurableSub(topic); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // Now send some more to the queue to create even more files. + sendMessages(queue); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + assertTrue(getNumberOfJournalFiles() > 1); + + LOG.info("Restarting the broker."); + restartBroker(); + LOG.info("Restarted the broker."); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + assertTrue(getNumberOfJournalFiles() > 1); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // Clear out all queue data + service.getAdminView().removeQueue(queue.getQueueName()); + + assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getNumberOfJournalFiles() <= 2; + } + }, TimeUnit.MINUTES.toMillis(2))); + + LOG.info("Sending {} Messages to the Topic.", MSG_COUNT); + // Send some messages to the inactive destination + sendMessages(topic); + + LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT); + assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic)); + + LOG.info("Recovering the broker."); + recoverBroker(); + LOG.info("Recovering the broker."); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + } + + @Test + public void testDurableAcksNotDropped() throws Exception { + + ActiveMQQueue queue = new ActiveMQQueue("MyQueue"); + ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic"); + + // Create durable sub in first data file. + createInactiveDurableSub(topic); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // Send to a Topic + sendMessages(topic, 1); + + // Send to a Queue to create some journal files + sendMessages(queue); + + LOG.info("Before consume there are currently [{}] journal log files.", getNumberOfJournalFiles()); + + // Consume all the Messages leaving acks behind. + consumeDurableMessages(topic, 1); + + LOG.info("After consume there are currently [{}] journal log files.", getNumberOfJournalFiles()); + + // Now send some more to the queue to create even more files. + sendMessages(queue); + + LOG.info("More Queued. There are currently [{}] journal log files.", getNumberOfJournalFiles()); + assertTrue(getNumberOfJournalFiles() > 1); + + LOG.info("Restarting the broker."); + restartBroker(); + LOG.info("Restarted the broker."); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + assertTrue(getNumberOfJournalFiles() > 1); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // Clear out all queue data + service.getAdminView().removeQueue(queue.getQueueName()); + + assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getNumberOfJournalFiles() <= 3; + } + }, TimeUnit.MINUTES.toMillis(3))); + + // See if we receive any message they should all be acked. + tryConsumeExpectNone(topic); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + + LOG.info("Recovering the broker."); + recoverBroker(); + LOG.info("Recovering the broker."); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // See if we receive any message they should all be acked. + tryConsumeExpectNone(topic); + + assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getNumberOfJournalFiles() == 1; + } + }, TimeUnit.MINUTES.toMillis(1))); + } + + private int getNumberOfJournalFiles() throws IOException { + Collection files = + ((KahaDBPersistenceAdapter) service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); + int reality = 0; + for (DataFile file : files) { + if (file != null) { + reality++; + } + } + + return reality; + } + + private void createInactiveDurableSub(Topic topic) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + consumer.close(); + connection.close(); + } + + private void consumeDurableMessages(Topic topic, int count) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + connection.start(); + for (int i = 0; i < count; ++i) { + if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) { + fail("should have received a message"); + } + } + consumer.close(); + connection.close(); + } + + private void tryConsumeExpectNone(Topic topic) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + connection.start(); + if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) { + fail("Should be no messages for this durable."); + } + consumer.close(); + connection.close(); + } + + private int consumeFromInactiveDurableSub(Topic topic) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + + int count = 0; + + while (consumer.receive(10000) != null) { + count++; + } + + consumer.close(); + connection.close(); + + return count; + } + + private void sendMessages(Destination destination) throws Exception { + sendMessages(destination, MSG_COUNT); + } + + private void sendMessages(Destination destination, int count) throws Exception { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < count; ++i) { + TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + destination); + producer.send(message); + } + connection.close(); + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java index e53edc587d..c163759f28 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java @@ -16,18 +16,27 @@ */ package org.apache.activemq.store.kahadb; -import junit.framework.TestCase; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.util.IOHelper; - -import javax.jms.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.security.ProtectionDomain; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * @author chirino */ @@ -43,36 +52,36 @@ public class KahaDBVersionTest extends TestCase { } static final Logger LOG = LoggerFactory.getLogger(KahaDBVersionTest.class); - final static File VERSION_1_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1"); - final static File VERSION_2_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2"); - final static File VERSION_3_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3"); + final static File VERSION_1_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1"); + final static File VERSION_2_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2"); + final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3"); + final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4"); BrokerService broker = null; protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception { - broker = new BrokerService(); broker.setUseJmx(false); broker.setPersistenceAdapter(kaha); broker.start(); return broker; - } + @Override protected void tearDown() throws Exception { if (broker != null) { broker.stop(); } } - + public void XtestCreateStore() throws Exception { KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersionX"); + File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4"); IOHelper.deleteFile(dir); kaha.setDirectory(dir); - kaha.setJournalMaxFileLength(1024*1024); + kaha.setJournalMaxFileLength(1024 * 1024); BrokerService broker = createBroker(kaha); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); Connection connection = cf.createConnection(); connection.setClientID("test"); connection.start(); @@ -85,33 +94,37 @@ public class KahaDBVersionTest extends TestCase { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test.topic"); Queue queue = session.createQueue("test.queue"); - MessageConsumer consumer = session.createDurableSubscriber(topic,"test"); + MessageConsumer consumer = session.createDurableSubscriber(topic, "test"); consumer.close(); MessageProducer producer = session.createProducer(topic); producer.setPriority(9); - for (int i =0; i < numToSend; i++) { - Message msg = session.createTextMessage("test message:"+i); + for (int i = 0; i < numToSend; i++) { + Message msg = session.createTextMessage("test message:" + i); producer.send(msg); } - LOG.info("sent " + numToSend +" to topic"); + LOG.info("sent " + numToSend + " to topic"); producer = session.createProducer(queue); - for (int i =0; i < numToSend; i++) { - Message msg = session.createTextMessage("test message:"+i); + for (int i = 0; i < numToSend; i++) { + Message msg = session.createTextMessage("test message:" + i); producer.send(msg); } - LOG.info("sent " + numToSend +" to queue"); + LOG.info("sent " + numToSend + " to queue"); } - public void testVersion1Conversion() throws Exception{ - doConvertRestartCycle(VERSION_1_DB); + public void testVersion1Conversion() throws Exception { + doConvertRestartCycle(VERSION_1_DB); } - public void testVersion2Conversion() throws Exception{ - doConvertRestartCycle(VERSION_2_DB); + public void testVersion2Conversion() throws Exception { + doConvertRestartCycle(VERSION_2_DB); } - public void testVersion3Conversion() throws Exception{ - doConvertRestartCycle(VERSION_3_DB); + public void testVersion3Conversion() throws Exception { + doConvertRestartCycle(VERSION_3_DB); + } + + public void testVersion4Conversion() throws Exception { + doConvertRestartCycle(VERSION_4_DB); } public void doConvertRestartCycle(File existingStore) throws Exception { @@ -145,7 +158,7 @@ public class KahaDBVersionTest extends TestCase { for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) { TextMessage msg = (TextMessage) queueConsumer.receive(10000); count++; - //System.err.println(msg.getText()); + // System.err.println(msg.getText()); assertNotNull(msg); } LOG.info("Consumed " + count + " from queue"); @@ -154,12 +167,12 @@ public class KahaDBVersionTest extends TestCase { for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) { TextMessage msg = (TextMessage) topicConsumer.receive(10000); count++; - //System.err.println(msg.getText()); + // System.err.println(msg.getText()); assertNotNull(msg); } LOG.info("Consumed " + count + " from topic"); connection.close(); - + broker.stop(); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java index d4a9d80b61..0afc8da7a9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java @@ -20,13 +20,16 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; + import junit.framework.Test; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; @@ -44,8 +47,9 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org public int messageCount = 10000; private BrokerService broker; private ActiveMQTopic topic; - private List exceptions = new ArrayList(); + private final List exceptions = new ArrayList(); + @Override protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); connectionFactory.setWatchTopicAdvisories(false); @@ -68,6 +72,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org return suite(DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.class); } + @Override protected void setUp() throws Exception { exceptions.clear(); topic = (ActiveMQTopic) createDestination(); @@ -75,6 +80,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org super.setUp(); } + @Override protected void tearDown() throws Exception { super.tearDown(); destroyBroker(); @@ -128,6 +134,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org final CountDownLatch goOn = new CountDownLatch(1); Thread sendThread = new Thread() { + @Override public void run() { try { @@ -208,10 +215,10 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org LOG.info("Store free page count: " + store.getPageFile().getFreePageCount()); LOG.info("Store page in-use: " + (store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount())); - assertTrue("no leak of pages, always use just 10", Wait.waitFor(new Wait.Condition() { + assertTrue("no leak of pages, always use just 11", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - return 10 == store.getPageFile().getPageCount() - + return 11 == store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount(); } }, TimeUnit.SECONDS.toMillis(10))); @@ -236,6 +243,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org Listener() { } + @Override public void onMessage(Message message) { count++; if (id != null) {