From 11ae61f5398445c05da2b5dc0c32d2633921335e Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 14 Jan 2014 12:23:52 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4969 --- .../scheduler/JobSchedulerStoreImpl.java | 48 ++++++------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java index 40a027db20..593491486d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java @@ -30,8 +30,11 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.broker.LockableServiceSupport; +import org.apache.activemq.broker.Locker; import org.apache.activemq.broker.scheduler.JobScheduler; import org.apache.activemq.broker.scheduler.JobSchedulerStore; +import org.apache.activemq.store.SharedFileLocker; import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Location; @@ -43,13 +46,11 @@ import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.LockFile; import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.util.ServiceSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore { +public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore { static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; @@ -59,7 +60,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule private File directory; PageFile pageFile; private Journal journal; - LockFile lockFile; protected AtomicLong journalSize = new AtomicLong(0); private boolean failIfDatabaseIsLocked; private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; @@ -248,7 +248,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); } IOHelper.mkdirs(this.directory); - lock(); this.journal = new Journal(); this.journal.setDirectory(directory); this.journal.setMaxFileLength(getJournalMaxFileLength()); @@ -302,10 +301,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule if (this.journal != null) { journal.close(); } - if (this.lockFile != null) { - this.lockFile.unlock(); - } - this.lockFile = null; LOG.info(this + " stopped"); } @@ -340,30 +335,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule return this.journal.write(payload, sync); } - private void lock() throws IOException { - if (lockFile == null) { - File lockFileName = new File(directory, "lock"); - lockFile = new LockFile(lockFileName, true); - if (failIfDatabaseIsLocked) { - lockFile.lock(); - } else { - while (true) { - try { - lockFile.lock(); - break; - } catch (IOException e) { - LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) - + " seconds for the database to be unlocked. Reason: " + e); - try { - Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); - } catch (InterruptedException e1) { - } - } - } - } - } - } - PageFile getPageFile() { this.pageFile.isLoaded(); return this.pageFile; @@ -405,4 +376,15 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule public String toString() { return "JobSchedulerStore:" + this.directory; } + + @Override + public Locker createDefaultLocker() throws IOException { + SharedFileLocker locker = new SharedFileLocker(); + locker.setDirectory(this.getDirectory()); + return locker; + } + + @Override + public void init() throws Exception { + } }