mirror of https://github.com/apache/activemq.git
better synchronization of the metadata.lastUpdate var
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@741597 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
deea2d11a8
commit
51e82d59e7
|
@ -356,44 +356,48 @@ public class MessageDatabase {
|
|||
* @throws IllegalStateException
|
||||
*/
|
||||
private void recover() throws IllegalStateException, IOException {
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
Location recoveryPosition = getRecoveryPosition();
|
||||
if( recoveryPosition ==null ) {
|
||||
return;
|
||||
synchronized (indexMutex) {
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
Location recoveryPosition = getRecoveryPosition();
|
||||
if( recoveryPosition ==null ) {
|
||||
return;
|
||||
}
|
||||
|
||||
int redoCounter = 0;
|
||||
LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
|
||||
|
||||
while (recoveryPosition != null) {
|
||||
JournalCommand message = load(recoveryPosition);
|
||||
metadata.lastUpdate = recoveryPosition;
|
||||
process(message, recoveryPosition);
|
||||
redoCounter++;
|
||||
recoveryPosition = journal.getNextLocation(recoveryPosition);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
|
||||
}
|
||||
|
||||
int redoCounter = 0;
|
||||
LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
|
||||
|
||||
while (recoveryPosition != null) {
|
||||
JournalCommand message = load(recoveryPosition);
|
||||
metadata.lastUpdate = recoveryPosition;
|
||||
process(message, recoveryPosition);
|
||||
redoCounter++;
|
||||
recoveryPosition = journal.getNextLocation(recoveryPosition);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
|
||||
}
|
||||
|
||||
private Location nextRecoveryPosition;
|
||||
private Location lastRecoveryPosition;
|
||||
|
||||
public void incrementalRecover() throws IOException {
|
||||
if( nextRecoveryPosition == null ) {
|
||||
if( lastRecoveryPosition==null ) {
|
||||
nextRecoveryPosition = getRecoveryPosition();
|
||||
} else {
|
||||
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
|
||||
}
|
||||
}
|
||||
while (nextRecoveryPosition != null) {
|
||||
lastRecoveryPosition = nextRecoveryPosition;
|
||||
metadata.lastUpdate = lastRecoveryPosition;
|
||||
JournalCommand message = load(lastRecoveryPosition);
|
||||
process(message, lastRecoveryPosition);
|
||||
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
|
||||
synchronized (indexMutex) {
|
||||
if( nextRecoveryPosition == null ) {
|
||||
if( lastRecoveryPosition==null ) {
|
||||
nextRecoveryPosition = getRecoveryPosition();
|
||||
} else {
|
||||
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
|
||||
}
|
||||
}
|
||||
while (nextRecoveryPosition != null) {
|
||||
lastRecoveryPosition = nextRecoveryPosition;
|
||||
metadata.lastUpdate = lastRecoveryPosition;
|
||||
JournalCommand message = load(lastRecoveryPosition);
|
||||
process(message, lastRecoveryPosition);
|
||||
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -482,7 +486,9 @@ public class MessageDatabase {
|
|||
LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
|
||||
}
|
||||
|
||||
metadata.lastUpdate = location;
|
||||
synchronized (indexMutex) {
|
||||
metadata.lastUpdate = location;
|
||||
}
|
||||
return location;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue