diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 16fb99de86..9bf86f91be 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -360,25 +360,64 @@ public class MessageDatabase { long start = System.currentTimeMillis(); Location recoveryPosition = getRecoveryPosition(); - if( recoveryPosition ==null ) { - return; + if( recoveryPosition!=null ) { + int redoCounter = 0; + while (recoveryPosition != null) { + JournalCommand message = load(recoveryPosition); + metadata.lastUpdate = recoveryPosition; + process(message, recoveryPosition); + redoCounter++; + recoveryPosition = journal.getNextLocation(recoveryPosition); + } + long end = System.currentTimeMillis(); + LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); } - - int redoCounter = 0; - LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset()); - - while (recoveryPosition != null) { - JournalCommand message = load(recoveryPosition); - metadata.lastUpdate = recoveryPosition; - process(message, recoveryPosition); - redoCounter++; - recoveryPosition = journal.getNextLocation(recoveryPosition); - } - long end = System.currentTimeMillis(); - LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds."); + + // We may have to undo some index updates. + pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + recoverIndex(tx); + } + }); } } + protected void recoverIndex(Transaction tx) throws IOException { + long start = System.currentTimeMillis(); + // It is possible index updates got applied before the journal updates.. + // in that case we need to removed references to messages that are not in the journal + final Location lastAppendLocation = journal.getLastAppendLocation(); + long undoCounter=0; + + // Go through all the destinations to see if they have messages past the lastAppendLocation + for (StoredDestination sd : storedDestinations.values()) { + + final ArrayList matches = new ArrayList(); + // Find all the Locations that are >= than the last Append Location. + sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor(lastAppendLocation) { + @Override + protected void matched(Location key, Long value) { + matches.add(value); + } + }); + + + for (Long sequenceId : matches) { + MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); + sd.locationIndex.remove(tx, keys.location); + sd.messageIdIndex.remove(tx, keys.messageId); + undoCounter++; + // TODO: do we need to modify the ack positions for the pub sub case? + } + } + long end = System.currentTimeMillis(); + if( undoCounter > 0 ) { + // The rolledback operations are basically in flight journal writes. To avoid getting these the end user + // should do sync writes to the journal. + LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds."); + } + } + private Location nextRecoveryPosition; private Location lastRecoveryPosition; diff --git a/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java b/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java index a8a63c954b..35bc230a1b 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java @@ -43,4 +43,96 @@ public interface BTreeVisitor { */ void visit(List keys, List values); + + abstract class GTVisitor, Value> implements BTreeVisitor{ + final private Key value; + + public GTVisitor(Key value) { + this.value = value; + } + + public boolean isInterestedInKeysBetween(Key first, Key second) { + return second==null || second.compareTo(value)>0; + } + + public void visit(List keys, List values) { + for( int i=0; i < keys.size(); i++) { + Key key = keys.get(i); + if( key.compareTo(value)>0 ) { + matched(key, values.get(i)); + } + } + } + + abstract protected void matched(Key key, Value value); + } + + abstract class GTEVisitor, Value> implements BTreeVisitor{ + final private Key value; + + public GTEVisitor(Key value) { + this.value = value; + } + + public boolean isInterestedInKeysBetween(Key first, Key second) { + return second==null || second.compareTo(value)>=0; + } + + public void visit(List keys, List values) { + for( int i=0; i < keys.size(); i++) { + Key key = keys.get(i); + if( key.compareTo(value)>=0 ) { + matched(key, values.get(i)); + } + } + } + + abstract protected void matched(Key key, Value value); + } + + abstract class LTVisitor, Value> implements BTreeVisitor{ + final private Key value; + + public LTVisitor(Key value) { + this.value = value; + } + + public boolean isInterestedInKeysBetween(Key first, Key second) { + return first==null || first.compareTo(value)<0; + } + + public void visit(List keys, List values) { + for( int i=0; i < keys.size(); i++) { + Key key = keys.get(i); + if( key.compareTo(value)<0 ) { + matched(key, values.get(i)); + } + } + } + + abstract protected void matched(Key key, Value value); + } + + abstract class LTEVisitor, Value> implements BTreeVisitor{ + final private Key value; + + public LTEVisitor(Key value) { + this.value = value; + } + + public boolean isInterestedInKeysBetween(Key first, Key second) { + return first==null || first.compareTo(value)<=0; + } + + public void visit(List keys, List values) { + for( int i=0; i < keys.size(); i++) { + Key key = keys.get(i); + if( key.compareTo(value)<=0 ) { + matched(key, values.get(i)); + } + } + } + + abstract protected void matched(Key key, Value value); + } } \ No newline at end of file