- added some handy generic visitors to the BTreeVisitor class.

- Updated the recovery process so it now rollsback changes applied to the index which did not get synced to the journal.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@741659 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-02-06 18:19:31 +00:00
parent ca80106573
commit d761e80ae4
2 changed files with 146 additions and 15 deletions

View File

@ -360,25 +360,64 @@ public class MessageDatabase {
long start = System.currentTimeMillis();
Location recoveryPosition = getRecoveryPosition();
if( recoveryPosition ==null ) {
return;
if( recoveryPosition!=null ) {
int redoCounter = 0;
while (recoveryPosition != null) {
JournalCommand message = load(recoveryPosition);
metadata.lastUpdate = recoveryPosition;
process(message, recoveryPosition);
redoCounter++;
recoveryPosition = journal.getNextLocation(recoveryPosition);
}
long end = System.currentTimeMillis();
LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
}
int redoCounter = 0;
LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
while (recoveryPosition != null) {
JournalCommand message = load(recoveryPosition);
metadata.lastUpdate = recoveryPosition;
process(message, recoveryPosition);
redoCounter++;
recoveryPosition = journal.getNextLocation(recoveryPosition);
}
long end = System.currentTimeMillis();
LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
// We may have to undo some index updates.
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
recoverIndex(tx);
}
});
}
}
protected void recoverIndex(Transaction tx) throws IOException {
long start = System.currentTimeMillis();
// It is possible index updates got applied before the journal updates..
// in that case we need to removed references to messages that are not in the journal
final Location lastAppendLocation = journal.getLastAppendLocation();
long undoCounter=0;
// Go through all the destinations to see if they have messages past the lastAppendLocation
for (StoredDestination sd : storedDestinations.values()) {
final ArrayList<Long> matches = new ArrayList<Long>();
// Find all the Locations that are >= than the last Append Location.
sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
@Override
protected void matched(Location key, Long value) {
matches.add(value);
}
});
for (Long sequenceId : matches) {
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId);
undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case?
}
}
long end = System.currentTimeMillis();
if( undoCounter > 0 ) {
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
// should do sync writes to the journal.
LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds.");
}
}
private Location nextRecoveryPosition;
private Location lastRecoveryPosition;

View File

@ -43,4 +43,96 @@ public interface BTreeVisitor<Key,Value> {
*/
void visit(List<Key> keys, List<Value> values);
abstract class GTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
final private Key value;
public GTVisitor(Key value) {
this.value = value;
}
public boolean isInterestedInKeysBetween(Key first, Key second) {
return second==null || second.compareTo(value)>0;
}
public void visit(List<Key> keys, List<Value> values) {
for( int i=0; i < keys.size(); i++) {
Key key = keys.get(i);
if( key.compareTo(value)>0 ) {
matched(key, values.get(i));
}
}
}
abstract protected void matched(Key key, Value value);
}
abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
final private Key value;
public GTEVisitor(Key value) {
this.value = value;
}
public boolean isInterestedInKeysBetween(Key first, Key second) {
return second==null || second.compareTo(value)>=0;
}
public void visit(List<Key> keys, List<Value> values) {
for( int i=0; i < keys.size(); i++) {
Key key = keys.get(i);
if( key.compareTo(value)>=0 ) {
matched(key, values.get(i));
}
}
}
abstract protected void matched(Key key, Value value);
}
abstract class LTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
final private Key value;
public LTVisitor(Key value) {
this.value = value;
}
public boolean isInterestedInKeysBetween(Key first, Key second) {
return first==null || first.compareTo(value)<0;
}
public void visit(List<Key> keys, List<Value> values) {
for( int i=0; i < keys.size(); i++) {
Key key = keys.get(i);
if( key.compareTo(value)<0 ) {
matched(key, values.get(i));
}
}
}
abstract protected void matched(Key key, Value value);
}
abstract class LTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
final private Key value;
public LTEVisitor(Key value) {
this.value = value;
}
public boolean isInterestedInKeysBetween(Key first, Key second) {
return first==null || first.compareTo(value)<=0;
}
public void visit(List<Key> keys, List<Value> values) {
for( int i=0; i < keys.size(); i++) {
Key key = keys.get(i);
if( key.compareTo(value)<=0 ) {
matched(key, values.get(i));
}
}
}
abstract protected void matched(Key key, Value value);
}
}