mirror of https://github.com/apache/activemq.git
Clean up some code in MessageDatabase, add guards around logging to avoid big string ops when not needed. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1172640 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
199c6838d4
commit
5325cdb30b
|
@ -147,7 +147,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
try {
|
||||
version = is.readInt();
|
||||
}catch (EOFException expectedOnUpgrade) {
|
||||
} catch (EOFException expectedOnUpgrade) {
|
||||
version=1;
|
||||
}
|
||||
LOG.info("KahaDB is version " + version);
|
||||
|
@ -215,7 +215,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
boolean enableIndexWriteAsync = false;
|
||||
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
|
||||
|
||||
|
||||
protected AtomicBoolean opened = new AtomicBoolean();
|
||||
private LockFile lockFile;
|
||||
private boolean ignoreMissingJournalfiles = false;
|
||||
|
@ -279,7 +278,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
});
|
||||
pageFile.flush();
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -379,7 +378,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
|
||||
public void load() throws IOException {
|
||||
|
||||
this.indexLock.writeLock().lock();
|
||||
try {
|
||||
lock();
|
||||
|
@ -395,13 +393,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
open();
|
||||
store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void close() throws IOException, InterruptedException {
|
||||
if( opened.compareAndSet(true, false)) {
|
||||
this.indexLock.writeLock().lock();
|
||||
|
@ -413,7 +409,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
});
|
||||
pageFile.unload();
|
||||
metadata = new Metadata();
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
journal.close();
|
||||
|
@ -438,7 +434,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
});
|
||||
}
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
close();
|
||||
|
@ -499,9 +495,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
redoCounter++;
|
||||
recoveryPosition = journal.getNextLocation(recoveryPosition);
|
||||
}
|
||||
if (LOG.isInfoEnabled()) {
|
||||
long end = System.currentTimeMillis();
|
||||
LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
|
||||
}
|
||||
}
|
||||
|
||||
// We may have to undo some index updates.
|
||||
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||
|
@ -520,11 +518,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
for (TransactionId tx: toRollback) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("rolling back recovered indoubt local transaction " + tx);
|
||||
}
|
||||
store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
|
||||
}
|
||||
}
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -584,7 +584,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
});
|
||||
|
||||
|
||||
for (Long sequenceId : matches) {
|
||||
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
|
||||
sd.locationIndex.remove(tx, keys.location);
|
||||
|
@ -595,12 +594,14 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
if( undoCounter > 0 ) {
|
||||
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
|
||||
// should do sync writes to the journal.
|
||||
// The rolledback operations are basically in flight journal writes. To avoid getting
|
||||
// these the end user should do sync writes to the journal.
|
||||
if (LOG.isInfoEnabled()) {
|
||||
long end = System.currentTimeMillis();
|
||||
LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
|
||||
}
|
||||
}
|
||||
|
||||
undoCounter = 0;
|
||||
start = System.currentTimeMillis();
|
||||
|
@ -637,34 +638,36 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
});
|
||||
}
|
||||
HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
|
||||
while( !ss.isEmpty() ) {
|
||||
missingJournalFiles.add( (int)ss.removeFirst() );
|
||||
while (!ss.isEmpty()) {
|
||||
missingJournalFiles.add((int) ss.removeFirst());
|
||||
}
|
||||
missingJournalFiles.removeAll( journal.getFileMap().keySet() );
|
||||
missingJournalFiles.removeAll(journal.getFileMap().keySet());
|
||||
|
||||
if( !missingJournalFiles.isEmpty() ) {
|
||||
LOG.info("Some journal files are missing: "+missingJournalFiles);
|
||||
if (!missingJournalFiles.isEmpty()) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Some journal files are missing: " + missingJournalFiles);
|
||||
}
|
||||
}
|
||||
|
||||
ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
|
||||
for (Integer missing : missingJournalFiles) {
|
||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
|
||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
|
||||
}
|
||||
|
||||
if ( checkForCorruptJournalFiles ) {
|
||||
if (checkForCorruptJournalFiles) {
|
||||
Collection<DataFile> dataFiles = journal.getFileMap().values();
|
||||
for (DataFile dataFile : dataFiles) {
|
||||
int id = dataFile.getDataFileId();
|
||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
|
||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
|
||||
Sequence seq = dataFile.getCorruptedBlocks().getHead();
|
||||
while( seq!=null ) {
|
||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
|
||||
while (seq != null) {
|
||||
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
|
||||
seq = seq.getNext();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if( !missingPredicates.isEmpty() ) {
|
||||
if (!missingPredicates.isEmpty()) {
|
||||
for (StoredDestination sd : storedDestinations.values()) {
|
||||
|
||||
final ArrayList<Long> matches = new ArrayList<Long>();
|
||||
|
@ -676,7 +679,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
});
|
||||
|
||||
// If somes message references are affected by the missing data files...
|
||||
if( !matches.isEmpty() ) {
|
||||
if (!matches.isEmpty()) {
|
||||
|
||||
// We either 'gracefully' recover dropping the missing messages or
|
||||
// we error out.
|
||||
|
@ -697,13 +700,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
|
||||
end = System.currentTimeMillis();
|
||||
if( undoCounter > 0 ) {
|
||||
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
|
||||
// should do sync writes to the journal.
|
||||
if (LOG.isInfoEnabled()) {
|
||||
long end = System.currentTimeMillis();
|
||||
LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Location nextRecoveryPosition;
|
||||
private Location lastRecoveryPosition;
|
||||
|
@ -725,7 +730,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
process(message, lastRecoveryPosition, (Runnable)null);
|
||||
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
|
||||
}
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -766,15 +771,17 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
checkpointUpdate(tx, cleanup);
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
|
||||
LOG.info("Slow KahaDB access: cleanup took "+(end-start));
|
||||
}
|
||||
}
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Slow KahaDB access: cleanup took " + (end - start));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void checkpoint(Callback closure) throws Exception {
|
||||
this.indexLock.writeLock().lock();
|
||||
|
@ -785,7 +792,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
});
|
||||
closure.execute();
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -819,8 +826,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
process(data, location, after);
|
||||
long end = System.currentTimeMillis();
|
||||
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
|
||||
}
|
||||
}
|
||||
|
||||
if (after != null) {
|
||||
Runnable afterCompletion = null;
|
||||
|
@ -860,8 +869,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
ByteSequence data = journal.read(location);
|
||||
long end = System.currentTimeMillis();
|
||||
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
|
||||
}
|
||||
}
|
||||
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
|
||||
byte readByte = is.readByte();
|
||||
KahaEntryType type = KahaEntryType.valueOf(readByte);
|
||||
|
@ -960,7 +971,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
upadateIndex(tx, command, location);
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -978,11 +989,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
updateIndex(tx, command, location);
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
|
||||
|
@ -993,7 +1003,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
updateIndex(tx, command, location);
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -1006,7 +1016,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
updateIndex(tx, command, location);
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
} finally {
|
||||
this.indexLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
@ -1255,7 +1265,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
|
||||
final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
|
||||
}
|
||||
|
||||
// Don't GC files under replication
|
||||
if( journalFilesBeingReplicated!=null ) {
|
||||
|
@ -1282,8 +1294,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
break;
|
||||
}
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
|
||||
}
|
||||
}
|
||||
|
||||
// Go through all the destinations to see if any of them can remove GC candidates.
|
||||
for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
|
||||
|
@ -1329,11 +1343,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
});
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
|
||||
}
|
||||
}
|
||||
|
||||
// check we are not deleting file with ack for in-use journal files
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("gc candidates: " + gcCandidateSet);
|
||||
}
|
||||
final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
|
||||
Iterator<Integer> candidates = gcCandidateSet.iterator();
|
||||
while (candidates.hasNext()) {
|
||||
|
@ -1350,14 +1368,18 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
if (gcCandidateSet.contains(candidate)) {
|
||||
ackMessageFileMap.remove(candidate);
|
||||
} else {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("not removing data file: " + candidate
|
||||
+ " as contained ack(s) refer to referenced file: " + referencedFileIds);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if( !gcCandidateSet.isEmpty() ) {
|
||||
LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
|
||||
if (!gcCandidateSet.isEmpty()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
|
||||
}
|
||||
journal.removeDataFiles(gcCandidateSet);
|
||||
}
|
||||
}
|
||||
|
@ -1382,7 +1404,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
// StoredDestination related implementation methods.
|
||||
// /////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
|
||||
|
||||
class StoredSubscription {
|
||||
|
@ -1615,7 +1636,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
return rc;
|
||||
}
|
||||
|
||||
|
||||
protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
|
||||
String key = key(destination);
|
||||
StoredDestination rc = storedDestinations.get(key);
|
||||
|
@ -2357,7 +2377,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
void remove(Transaction tx) throws IOException {
|
||||
defaultPriorityIndex.clear(tx);
|
||||
defaultPriorityIndex.unload(tx);
|
||||
|
@ -2508,8 +2527,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
final Iterator<Entry<Long, MessageKeys>>defaultIterator;
|
||||
final Iterator<Entry<Long, MessageKeys>>lowIterator;
|
||||
|
||||
|
||||
|
||||
MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
|
||||
this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
|
||||
if (highPriorityIndex != null) {
|
||||
|
@ -2560,6 +2577,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
if (currentIterator == defaultIterator) {
|
||||
if (lowIterator.hasNext()) {
|
||||
currentIterator = lowIterator;
|
||||
|
|
Loading…
Reference in New Issue