The ack compaction task now acquires the checkpoint lock while it runs
to prevent a checkpoint from running at the same time unintentionally.
Also, getJournalLocation is now protected by a try/catch to handle
errors.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-05-11 12:26:16 +00:00
parent c81a9348ee
commit e53e340262
1 changed files with 73 additions and 36 deletions

View File

@ -1888,46 +1888,67 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
int journalToAdvance = -1;
Set<Integer> journalLogsReferenced = new HashSet<Integer>();
// Lock index to capture the ackMessageFileMap data
indexLock.writeLock().lock();
try {
// Map keys might not be sorted, find the earliest log file to forward acks
// from and move only those, future cycles can chip away at more as needed.
// We won't move files that are themselves rewritten on a previous compaction.
List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
Collections.sort(journalFileIds);
for (Integer journalFileId : journalFileIds) {
DataFile current = journal.getDataFileById(journalFileId);
if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
journalToAdvance = journalFileId;
break;
//acquire the checkpoint lock to prevent other threads from
//running a checkpoint while this is running
//
//Normally this task runs on the same executor as the checkpoint task
//so this ack compaction runner wouldn't run at the same time as the checkpoint task.
//
//However, there are two cases where this isn't always true.
//First, the checkpoint() method is public and can be called through the
//PersistenceAdapter interface by someone at the same time this is running.
//Second, a checkpoint is called during shutdown without using the executor.
//
//In the future it might be better to just remove the checkpointLock entirely
//and only use the executor but this would need to be examined for any unintended
//consequences
checkpointLock.writeLock().lock();
try {
// Lock index to capture the ackMessageFileMap data
indexLock.writeLock().lock();
// Map keys might not be sorted, find the earliest log file to forward acks
// from and move only those, future cycles can chip away at more as needed.
// We won't move files that are themselves rewritten on a previous compaction.
List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
Collections.sort(journalFileIds);
for (Integer journalFileId : journalFileIds) {
DataFile current = journal.getDataFileById(journalFileId);
if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
journalToAdvance = journalFileId;
break;
}
}
// Check if we found one, or if we only found the current file being written to.
if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
return;
}
journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
} finally {
indexLock.writeLock().unlock();
}
// Check if we found one, or if we only found the current file being written to.
if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
return;
try {
// Background rewrite of the old acks
forwardAllAcks(journalToAdvance, journalLogsReferenced);
// Checkpoint with changes from the ackMessageFileMap
checkpointUpdate(false);
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
} catch (Throwable e) {
LOG.error("Checkpoint failed", e);
brokerService.handleIOException(IOExceptionSupport.create(e));
}
journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
} finally {
indexLock.writeLock().unlock();
}
try {
// Background rewrite of the old acks
forwardAllAcks(journalToAdvance, journalLogsReferenced);
// Checkpoint with changes from the ackMessageFileMap
checkpointUpdate(false);
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
} catch (Throwable e) {
LOG.error("Checkpoint failed", e);
brokerService.handleIOException(IOExceptionSupport.create(e));
checkpointLock.writeLock().unlock();
}
}
}
@ -1949,7 +1970,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0));
Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0));
while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
JournalCommand<?> command = null;
try {
@ -1964,7 +1985,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
}
nextLocation = journal.getNextLocation(nextLocation);
nextLocation = getNextLocationForAckForward(nextLocation);
}
}
@ -1994,6 +2015,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
}
private Location getNextLocationForAckForward(final Location nextLocation) {
//getNextLocation() can throw an IOException, we should handle it and set
//nextLocation to null and abort gracefully
//Should not happen in the normal case
Location location = null;
try {
location = journal.getNextLocation(nextLocation);
} catch (IOException e) {
LOG.warn("Failed to load next journal location: {}", e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to load next journal location", e);
}
}
return location;
}
final Runnable nullCompletionCallback = new Runnable() {
@Override
public void run() {