Switching the checkpoint lock to a readlock when forwarding acks to
prevent other journal updates from being blocked.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-05-12 14:37:51 +00:00
parent 2e64abc38a
commit c8a6171d04
1 changed files with 21 additions and 7 deletions

View File

@ -1888,6 +1888,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
int journalToAdvance = -1;
Set<Integer> journalLogsReferenced = new HashSet<Integer>();
//flag to know whether the ack forwarding completed without an exception
boolean forwarded = false;
try {
//acquire the checkpoint lock to prevent other threads from
//running a checkpoint while this is running
@ -1903,7 +1906,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
//In the future it might be better to just remove the checkpointLock entirely
//and only use the executor but this would need to be examined for any unintended
//consequences
checkpointLock.writeLock().lock();
checkpointLock.readLock().lock();
try {
@ -1937,9 +1940,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
// Background rewrite of the old acks
forwardAllAcks(journalToAdvance, journalLogsReferenced);
forwarded = true;
} catch (IOException ioe) {
LOG.error("Forwarding of acks failed", ioe);
brokerService.handleIOException(ioe);
} catch (Throwable e) {
LOG.error("Forwarding of acks failed", e);
brokerService.handleIOException(IOExceptionSupport.create(e));
}
} finally {
checkpointLock.readLock().unlock();
}
try {
if (forwarded) {
// Checkpoint with changes from the ackMessageFileMap
checkpointUpdate(false);
}
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
@ -1947,9 +1964,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.error("Checkpoint failed", e);
brokerService.handleIOException(IOExceptionSupport.create(e));
}
} finally {
checkpointLock.writeLock().unlock();
}
}
}