This reverts commit 60b0c4f85a.

Inadvertantly commited a bunch of changes by mistake
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-05-09 19:05:43 +00:00
parent 60b0c4f85a
commit db3f8b3554
1 changed files with 59 additions and 40 deletions

View File

@ -111,6 +111,8 @@ import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
protected BrokerService brokerService; protected BrokerService brokerService;
@ -469,12 +471,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
checkpointLock.writeLock().unlock(); checkpointLock.writeLock().unlock();
} }
journal.close(); journal.close();
synchronized(schedulerLock) {
if (scheduler != null) {
ThreadPoolUtils.shutdownGraceful(scheduler, -1); ThreadPoolUtils.shutdownGraceful(scheduler, -1);
scheduler = null;
}
}
// clear the cache and journalSize on shutdown of the store // clear the cache and journalSize on shutdown of the store
storeCache.clear(); storeCache.clear();
journalSize.set(0); journalSize.set(0);
@ -630,11 +627,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Location producerAuditPosition = recoverProducerAudit(); Location afterProducerAudit = recoverProducerAudit();
Location ackMessageFileLocation = recoverAckMessageFileMap(); Location afterAckMessageFile = recoverAckMessageFileMap();
Location lastIndoubtPosition = getRecoveryPosition(); Location lastIndoubtPosition = getRecoveryPosition();
Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation); if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
// valid checkpoint, possible recover from afterAckMessageFile
afterProducerAudit = null;
}
Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
if (recoveryPosition != null) { if (recoveryPosition != null) {
@ -716,16 +717,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return TransactionIdConversion.convertToLocal(tx); return TransactionIdConversion.convertToLocal(tx);
} }
private Location minimum(Location producerAuditPosition, private Location minimum(Location x,
Location lastIndoubtPosition) { Location y) {
Location min = null; Location min = null;
if (producerAuditPosition != null) { if (x != null) {
min = producerAuditPosition; min = x;
if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { if (y != null) {
min = lastIndoubtPosition; int compare = y.compareTo(x);
if (compare < 0) {
min = y;
}
} }
} else { } else {
min = lastIndoubtPosition; min = y;
} }
return min; return min;
} }
@ -740,7 +744,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Cannot recover message audit", e); LOG.warn("Cannot recover message audit", e);
return journal.getNextLocation(null); return journal.getNextLocation(null);
@ -758,7 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try { try {
ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
return journal.getNextLocation(metadata.ackMessageFileMapLocation); return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Cannot recover ackMessageFileMap", e); LOG.warn("Cannot recover ackMessageFileMap", e);
return journal.getNextLocation(null); return journal.getNextLocation(null);
@ -986,13 +990,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Perhaps there were no transactions... // Perhaps there were no transactions...
if( metadata.lastUpdate!=null) { if( metadata.lastUpdate!=null) {
// Start replay at the record after the last one recorded in the index file. // Start replay at the record after the last one recorded in the index file.
return journal.getNextLocation(metadata.lastUpdate); return getNextInitializedLocation(metadata.lastUpdate);
} }
} }
// This loads the first position. // This loads the first position.
return journal.getNextLocation(null); return journal.getNextLocation(null);
} }
private Location getNextInitializedLocation(Location location) throws IOException {
Location mayNotBeInitialized = journal.getNextLocation(location);
if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
// need to init size and type to skip
return journal.getNextLocation(mayNotBeInitialized);
} else {
return mayNotBeInitialized;
}
}
protected void checkpointCleanup(final boolean cleanup) throws IOException { protected void checkpointCleanup(final boolean cleanup) throws IOException {
long start; long start;
this.indexLock.writeLock().lock(); this.indexLock.writeLock().lock();
@ -1865,15 +1879,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@Override @Override
public void run() { public void run() {
int journalToAdvance = -1;
Set<Integer> journalLogsReferenced = new HashSet<Integer>();
// Lock index to capture the ackMessageFileMap data // Lock index to capture the ackMessageFileMap data
indexLock.writeLock().lock(); indexLock.writeLock().lock();
try {
// Map keys might not be sorted, find the earliest log file to forward acks // Map keys might not be sorted, find the earliest log file to forward acks
// from and move only those, future cycles can chip away at more as needed. // from and move only those, future cycles can chip away at more as needed.
// We won't move files that are themselves rewritten on a previous compaction. // We won't move files that are themselves rewritten on a previous compaction.
List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
Collections.sort(journalFileIds); Collections.sort(journalFileIds);
int journalToAdvance = -1;
for (Integer journalFileId : journalFileIds) { for (Integer journalFileId : journalFileIds) {
DataFile current = journal.getDataFileById(journalFileId); DataFile current = journal.getDataFileById(journalFileId);
if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
@ -1887,10 +1905,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return; return;
} }
Set<Integer> journalLogsReferenced = journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
} finally {
indexLock.writeLock().unlock(); indexLock.writeLock().unlock();
}
try { try {
// Background rewrite of the old acks // Background rewrite of the old acks