diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index f148971e86..c1af2fe0e5 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -111,8 +111,6 @@ import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET; - public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { protected BrokerService brokerService; @@ -471,7 +469,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe checkpointLock.writeLock().unlock(); } journal.close(); - ThreadPoolUtils.shutdownGraceful(scheduler, -1); + synchronized(schedulerLock) { + if (scheduler != null) { + ThreadPoolUtils.shutdownGraceful(scheduler, -1); + scheduler = null; + } + } // clear the cache and journalSize on shutdown of the store storeCache.clear(); journalSize.set(0); @@ -627,15 +630,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { long start = System.currentTimeMillis(); - Location afterProducerAudit = recoverProducerAudit(); - Location afterAckMessageFile = recoverAckMessageFileMap(); + Location producerAuditPosition = recoverProducerAudit(); + Location ackMessageFileLocation = recoverAckMessageFileMap(); Location lastIndoubtPosition = getRecoveryPosition(); - if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) { - // valid checkpoint, possible recover from afterAckMessageFile - afterProducerAudit = null; - } - Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile); + Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation); recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); if (recoveryPosition != null) { @@ -717,19 +716,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return TransactionIdConversion.convertToLocal(tx); } - private Location minimum(Location x, - Location y) { + private Location minimum(Location producerAuditPosition, + Location lastIndoubtPosition) { Location min = null; - if (x != null) { - min = x; - if (y != null) { - int compare = y.compareTo(x); - if (compare < 0) { - min = y; - } + if (producerAuditPosition != null) { + min = producerAuditPosition; + if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { + min = lastIndoubtPosition; } } else { - min = y; + min = lastIndoubtPosition; } return min; } @@ -744,7 +740,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); - return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation); + return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); } catch (Exception e) { LOG.warn("Cannot recover message audit", e); return journal.getNextLocation(null); @@ -762,7 +758,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); metadata.ackMessageFileMap = (Map>) objectIn.readObject(); - return getNextInitializedLocation(metadata.ackMessageFileMapLocation); + return journal.getNextLocation(metadata.ackMessageFileMapLocation); } catch (Exception e) { LOG.warn("Cannot recover ackMessageFileMap", e); return journal.getNextLocation(null); @@ -990,23 +986,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Perhaps there were no transactions... if( metadata.lastUpdate!=null) { // Start replay at the record after the last one recorded in the index file. - return getNextInitializedLocation(metadata.lastUpdate); + return journal.getNextLocation(metadata.lastUpdate); } } // This loads the first position. return journal.getNextLocation(null); } - private Location getNextInitializedLocation(Location location) throws IOException { - Location mayNotBeInitialized = journal.getNextLocation(location); - if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) { - // need to init size and type to skip - return journal.getNextLocation(mayNotBeInitialized); - } else { - return mayNotBeInitialized; - } - } - protected void checkpointCleanup(final boolean cleanup) throws IOException { long start; this.indexLock.writeLock().lock(); @@ -1879,38 +1865,33 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @Override public void run() { - - int journalToAdvance = -1; - Set journalLogsReferenced = new HashSet(); - // Lock index to capture the ackMessageFileMap data indexLock.writeLock().lock(); - try { - // Map keys might not be sorted, find the earliest log file to forward acks - // from and move only those, future cycles can chip away at more as needed. - // We won't move files that are themselves rewritten on a previous compaction. - List journalFileIds = new ArrayList(metadata.ackMessageFileMap.keySet()); - Collections.sort(journalFileIds); - for (Integer journalFileId : journalFileIds) { - DataFile current = journal.getDataFileById(journalFileId); - if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { - journalToAdvance = journalFileId; - break; - } + // Map keys might not be sorted, find the earliest log file to forward acks + // from and move only those, future cycles can chip away at more as needed. + // We won't move files that are themselves rewritten on a previous compaction. + List journalFileIds = new ArrayList(metadata.ackMessageFileMap.keySet()); + Collections.sort(journalFileIds); + int journalToAdvance = -1; + for (Integer journalFileId : journalFileIds) { + DataFile current = journal.getDataFileById(journalFileId); + if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { + journalToAdvance = journalFileId; + break; } - - // Check if we found one, or if we only found the current file being written to. - if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { - return; - } - - journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); - - } finally { - indexLock.writeLock().unlock(); } + // Check if we found one, or if we only found the current file being written to. + if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { + return; + } + + Set journalLogsReferenced = + new HashSet(metadata.ackMessageFileMap.get(journalToAdvance)); + + indexLock.writeLock().unlock(); + try { // Background rewrite of the old acks forwardAllAcks(journalToAdvance, journalLogsReferenced);