https://issues.apache.org/jira/browse/AMQ-3161 - Race condition in ActiveMQ Journal Checkpoint worker thread cleanup leads to multiple running instances

patch applied with thanks, one small mod, left un synced check in store(..) such that locking only occurs if thread needs a restart

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1063710 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-01-26 13:30:42 +00:00
parent 029c1e3e28
commit 62c6c8f8d8
1 changed files with 44 additions and 31 deletions

View File

@ -213,6 +213,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
private boolean checksumJournalFiles = false; private boolean checksumJournalFiles = false;
private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY; private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
protected boolean forceRecoverIndex = false; protected boolean forceRecoverIndex = false;
private final Object checkpointThreadLock = new Object();
public MessageDatabase() { public MessageDatabase() {
} }
@ -273,38 +274,49 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
private void startCheckpoint() { private void startCheckpoint() {
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { synchronized (checkpointThreadLock) {
@Override boolean start = false;
public void run() { if (checkpointThread == null) {
try { start = true;
long lastCleanup = System.currentTimeMillis(); } else if (!checkpointThread.isAlive()) {
long lastCheckpoint = System.currentTimeMillis(); start = true;
// Sleep for a short time so we can periodically check LOG.info("KahaDB: Recovering checkpoint thread after death");
// to see if we need to exit this thread. }
long sleepTime = Math.min(checkpointInterval, 500); if (start) {
while (opened.get()) { checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
Thread.sleep(sleepTime); @Override
long now = System.currentTimeMillis(); public void run() {
if( now - lastCleanup >= cleanupInterval ) { try {
checkpointCleanup(true); long lastCleanup = System.currentTimeMillis();
lastCleanup = now; long lastCheckpoint = System.currentTimeMillis();
lastCheckpoint = now; // Sleep for a short time so we can periodically check
} else if( now - lastCheckpoint >= checkpointInterval ) { // to see if we need to exit this thread.
checkpointCleanup(false); long sleepTime = Math.min(checkpointInterval, 500);
lastCheckpoint = now; while (opened.get()) {
Thread.sleep(sleepTime);
long now = System.currentTimeMillis();
if( now - lastCleanup >= cleanupInterval ) {
checkpointCleanup(true);
lastCleanup = now;
lastCheckpoint = now;
} else if( now - lastCheckpoint >= checkpointInterval ) {
checkpointCleanup(false);
lastCheckpoint = now;
}
}
} catch (InterruptedException e) {
// Looks like someone really wants us to exit this thread...
} catch (IOException ioe) {
LOG.error("Checkpoint failed", ioe);
brokerService.handleIOException(ioe);
} }
} }
} catch (InterruptedException e) { };
// Looks like someone really wants us to exit this thread...
} catch (IOException ioe) { checkpointThread.setDaemon(true);
LOG.error("Checkpoint failed", ioe); checkpointThread.start();
brokerService.handleIOException(ioe);
}
} }
}
};
checkpointThread.setDaemon(true);
checkpointThread.start();
} }
public void open() throws IOException { public void open() throws IOException {
@ -378,7 +390,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
this.indexLock.writeLock().unlock(); this.indexLock.writeLock().unlock();
} }
journal.close(); journal.close();
checkpointThread.join(); synchronized (checkpointThreadLock) {
checkpointThread.join();
}
lockFile.unlock(); lockFile.unlock();
lockFile=null; lockFile=null;
} }
@ -761,7 +775,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
this.indexLock.writeLock().unlock(); this.indexLock.writeLock().unlock();
} }
if (!checkpointThread.isAlive()) { if (!checkpointThread.isAlive()) {
LOG.info("KahaDB: Recovering checkpoint thread after exception");
startCheckpoint(); startCheckpoint();
} }
if (after != null) { if (after != null) {