From e53e340262d5e57a11464c323606529430e9b832 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Wed, 11 May 2016 12:26:16 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6288 The ack compaction task now acquires the checkpoint lock while it runs to prevent a checkpoint from running at the same time unintentionally. Also, getJournalLocation is now protected by a try/catch to handle errors. --- .../store/kahadb/MessageDatabase.java | 109 ++++++++++++------ 1 file changed, 73 insertions(+), 36 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index a3addccbb3..21530c259c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1888,46 +1888,67 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe int journalToAdvance = -1; Set journalLogsReferenced = new HashSet(); - // Lock index to capture the ackMessageFileMap data - indexLock.writeLock().lock(); - try { - // Map keys might not be sorted, find the earliest log file to forward acks - // from and move only those, future cycles can chip away at more as needed. - // We won't move files that are themselves rewritten on a previous compaction. - List journalFileIds = new ArrayList(metadata.ackMessageFileMap.keySet()); - Collections.sort(journalFileIds); - for (Integer journalFileId : journalFileIds) { - DataFile current = journal.getDataFileById(journalFileId); - if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { - journalToAdvance = journalFileId; - break; + //acquire the checkpoint lock to prevent other threads from + //running a checkpoint while this is running + // + //Normally this task runs on the same executor as the checkpoint task + //so this ack compaction runner wouldn't run at the same time as the checkpoint task. + // + //However, there are two cases where this isn't always true. + //First, the checkpoint() method is public and can be called through the + //PersistenceAdapter interface by someone at the same time this is running. + //Second, a checkpoint is called during shutdown without using the executor. + // + //In the future it might be better to just remove the checkpointLock entirely + //and only use the executor but this would need to be examined for any unintended + //consequences + checkpointLock.writeLock().lock(); + + try { + + // Lock index to capture the ackMessageFileMap data + indexLock.writeLock().lock(); + + // Map keys might not be sorted, find the earliest log file to forward acks + // from and move only those, future cycles can chip away at more as needed. + // We won't move files that are themselves rewritten on a previous compaction. + List journalFileIds = new ArrayList(metadata.ackMessageFileMap.keySet()); + Collections.sort(journalFileIds); + for (Integer journalFileId : journalFileIds) { + DataFile current = journal.getDataFileById(journalFileId); + if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { + journalToAdvance = journalFileId; + break; + } } + + // Check if we found one, or if we only found the current file being written to. + if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { + return; + } + + journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); + + } finally { + indexLock.writeLock().unlock(); } - // Check if we found one, or if we only found the current file being written to. - if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { - return; + try { + // Background rewrite of the old acks + forwardAllAcks(journalToAdvance, journalLogsReferenced); + + // Checkpoint with changes from the ackMessageFileMap + checkpointUpdate(false); + } catch (IOException ioe) { + LOG.error("Checkpoint failed", ioe); + brokerService.handleIOException(ioe); + } catch (Throwable e) { + LOG.error("Checkpoint failed", e); + brokerService.handleIOException(IOExceptionSupport.create(e)); } - - journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); - } finally { - indexLock.writeLock().unlock(); - } - - try { - // Background rewrite of the old acks - forwardAllAcks(journalToAdvance, journalLogsReferenced); - - // Checkpoint with changes from the ackMessageFileMap - checkpointUpdate(false); - } catch (IOException ioe) { - LOG.error("Checkpoint failed", ioe); - brokerService.handleIOException(ioe); - } catch (Throwable e) { - LOG.error("Checkpoint failed", e); - brokerService.handleIOException(IOExceptionSupport.create(e)); + checkpointLock.writeLock().unlock(); } } } @@ -1949,7 +1970,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs()); LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); - Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0)); + Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0)); while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) { JournalCommand command = null; try { @@ -1964,7 +1985,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); } - nextLocation = journal.getNextLocation(nextLocation); + nextLocation = getNextLocationForAckForward(nextLocation); } } @@ -1994,6 +2015,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); } + private Location getNextLocationForAckForward(final Location nextLocation) { + //getNextLocation() can throw an IOException, we should handle it and set + //nextLocation to null and abort gracefully + //Should not happen in the normal case + Location location = null; + try { + location = journal.getNextLocation(nextLocation); + } catch (IOException e) { + LOG.warn("Failed to load next journal location: {}", e.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to load next journal location", e); + } + } + return location; + } + final Runnable nullCompletionCallback = new Runnable() { @Override public void run() {