Timothy Bish 2014-01-14 12:23:52 -05:00
parent 08f21ed71d
commit 11ae61f539
1 changed files with 15 additions and 33 deletions

View File

@ -30,8 +30,11 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.store.SharedFileLocker;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
@ -43,13 +46,11 @@ import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore {
static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@ -59,7 +60,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
private File directory;
PageFile pageFile;
private Journal journal;
LockFile lockFile;
protected AtomicLong journalSize = new AtomicLong(0);
private boolean failIfDatabaseIsLocked;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@ -248,7 +248,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
}
IOHelper.mkdirs(this.directory);
lock();
this.journal = new Journal();
this.journal.setDirectory(directory);
this.journal.setMaxFileLength(getJournalMaxFileLength());
@ -302,10 +301,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
if (this.journal != null) {
journal.close();
}
if (this.lockFile != null) {
this.lockFile.unlock();
}
this.lockFile = null;
LOG.info(this + " stopped");
}
@ -340,30 +335,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
return this.journal.write(payload, sync);
}
private void lock() throws IOException {
if (lockFile == null) {
File lockFileName = new File(directory, "lock");
lockFile = new LockFile(lockFileName, true);
if (failIfDatabaseIsLocked) {
lockFile.lock();
} else {
while (true) {
try {
lockFile.lock();
break;
} catch (IOException e) {
LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
+ " seconds for the database to be unlocked. Reason: " + e);
try {
Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
}
}
}
}
}
}
PageFile getPageFile() {
this.pageFile.isLoaded();
return this.pageFile;
@ -405,4 +376,15 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
public String toString() {
return "JobSchedulerStore:" + this.directory;
}
@Override
public Locker createDefaultLocker() throws IOException {
SharedFileLocker locker = new SharedFileLocker();
locker.setDirectory(this.getDirectory());
return locker;
}
@Override
public void init() throws Exception {
}
}