From 0484af1c61ca26ce8ea7e2decf953052e0a7ad1e Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 6 Nov 2012 22:05:06 +0000 Subject: [PATCH] Keep the broker.scheduler package free of kahadb impl specifics. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1406371 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-core/pom.xml | 10 +- .../activemq/broker/jmx/JobSchedulerView.java | 23 +- .../broker/scheduler/JobSchedulerStore.java | 392 +---------------- .../activemq/broker/scheduler/JobSupport.java | 39 ++ .../broker/scheduler/SchedulerBroker.java | 3 +- .../org/apache/activemq/store/PListStore.java | 3 +- .../kahadb}/scheduler/JobImpl.java | 24 +- .../kahadb}/scheduler/JobLocation.java | 2 +- .../kahadb}/scheduler/JobSchedulerImpl.java | 10 +- .../scheduler/JobSchedulerStoreImpl.java | 399 ++++++++++++++++++ .../scheduler/JobSchedulerStoreTest.java | 3 +- .../broker/scheduler/JobSchedulerTest.java | 3 +- 12 files changed, 484 insertions(+), 427 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java rename activemq-core/src/main/java/org/apache/activemq/{broker => store/kahadb}/scheduler/JobImpl.java (76%) rename activemq-core/src/main/java/org/apache/activemq/{broker => store/kahadb}/scheduler/JobLocation.java (99%) rename activemq-core/src/main/java/org/apache/activemq/{broker => store/kahadb}/scheduler/JobSchedulerImpl.java (98%) create mode 100644 activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index f02f50cbdc..0a41ebbc13 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -1136,11 +1136,11 @@ org/apache/activemq/xbean/JDBCPersistenceXBeanConfigTest.* org/apache/activemq/xbean/XBeanXmlTest.* org/apache/bugs/AMQ1730Test.* - org/apache/bugs/LoadBalanceTest.* - org/apache/kahadb/index/BTreeIndexTest.* - org/apache/kahadb/index/HashIndexTest.* - org/apache/kahadb/index/ListIndexTest.* - org/apache/kahadb/util/DataByteArrayInputStreamTest.* + org/apache/activemq/store/kahadb/bugs/LoadBalanceTest.* + org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.* + org/apache/activemq/store/kahadb/disk/index/HashIndexTest.* + org/apache/activemq/store/kahadb/disk/index/ListIndexTest.* + org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStreamTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java index bb31ad847c..9e5a1fba46 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java @@ -16,17 +16,14 @@ */ package org.apache.activemq.broker.jmx; -import java.io.IOException; -import java.util.List; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.scheduler.Job; -import org.apache.activemq.broker.scheduler.JobImpl; import org.apache.activemq.broker.scheduler.JobScheduler; +import org.apache.activemq.broker.scheduler.JobSupport; + +import javax.management.openmbean.*; +import java.io.IOException; +import java.util.List; public class JobSchedulerView implements JobSchedulerViewMBean { @@ -53,8 +50,8 @@ public class JobSchedulerView implements JobSchedulerViewMBean { CompositeType ct = factory.getCompositeType(); TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" }); TabularDataSupport rc = new TabularDataSupport(tt); - long start = JobImpl.getDataTime(startTime); - long finish = JobImpl.getDataTime(finishTime); + long start = JobSupport.getDataTime(startTime); + long finish = JobSupport.getDataTime(finishTime); List jobs = this.jobScheduler.getAllJobs(start, finish); for (Job job : jobs) { rc.put(new CompositeDataSupport(ct, factory.getFields(job))); @@ -76,7 +73,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean { public String getNextScheduleTime() throws Exception { long time = this.jobScheduler.getNextScheduleTime(); - return JobImpl.getDateTime(time); + return JobSupport.getDateTime(time); } public void removeAllJobs() throws Exception { @@ -85,8 +82,8 @@ public class JobSchedulerView implements JobSchedulerViewMBean { } public void removeAllJobs(String startTime, String finishTime) throws Exception { - long start = JobImpl.getDataTime(startTime); - long finish = JobImpl.getDataTime(finishTime); + long start = JobSupport.getDataTime(startTime); + long finish = JobSupport.getDataTime(finishTime); this.jobScheduler.removeAllJobs(start, finish); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java index eb2e7f459f..19ed89b7e3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java @@ -1,392 +1,20 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.activemq.broker.scheduler; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.util.ServiceSupport; -import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; -import org.apache.activemq.store.kahadb.disk.journal.Journal; -import org.apache.activemq.store.kahadb.disk.journal.Location; -import org.apache.activemq.store.kahadb.disk.page.Page; -import org.apache.activemq.store.kahadb.disk.page.PageFile; -import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller; -import org.apache.activemq.util.LockFile; -import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; -import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.activemq.Service; -import java.io.DataInput; -import java.io.DataOutput; import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -public class JobSchedulerStore extends ServiceSupport { - static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class); - private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; +/** + * @author Hiram Chirino + */ +public interface JobSchedulerStore extends Service { + File getDirectory(); - public static final int CLOSED_STATE = 1; - public static final int OPEN_STATE = 2; + void setDirectory(File directory); - private File directory; - PageFile pageFile; - private Journal journal; - private LockFile lockFile; - private boolean failIfDatabaseIsLocked; - private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; - private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; - private boolean enableIndexWriteAsync = false; - // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; - MetaData metaData = new MetaData(this); - final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); - Map schedulers = new HashMap(); + long size(); - protected class MetaData { - protected MetaData(JobSchedulerStore store) { - this.store = store; - } - private final JobSchedulerStore store; - Page page; - BTreeIndex journalRC; - BTreeIndex storedSchedulers; - - void createIndexes(Transaction tx) throws IOException { - this.storedSchedulers = new BTreeIndex(pageFile, tx.allocate().getPageId()); - this.journalRC = new BTreeIndex(pageFile, tx.allocate().getPageId()); - } - - void load(Transaction tx) throws IOException { - this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); - this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); - this.storedSchedulers.load(tx); - this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.load(tx); - } - - void loadScheduler(Transaction tx, Map schedulers) throws IOException { - for (Iterator> i = this.storedSchedulers.iterator(tx); i.hasNext();) { - Entry entry = i.next(); - entry.getValue().load(tx); - schedulers.put(entry.getKey(), entry.getValue()); - } - } - - public void read(DataInput is) throws IOException { - this.storedSchedulers = new BTreeIndex(pageFile, is.readLong()); - this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); - this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); - this.journalRC = new BTreeIndex(pageFile, is.readLong()); - this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); - this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); - } - - public void write(DataOutput os) throws IOException { - os.writeLong(this.storedSchedulers.getPageId()); - os.writeLong(this.journalRC.getPageId()); - - } - } - - class MetaDataMarshaller extends VariableMarshaller { - private final JobSchedulerStore store; - - MetaDataMarshaller(JobSchedulerStore store) { - this.store = store; - } - public MetaData readPayload(DataInput dataIn) throws IOException { - MetaData rc = new MetaData(this.store); - rc.read(dataIn); - return rc; - } - - public void writePayload(MetaData object, DataOutput dataOut) throws IOException { - object.write(dataOut); - } - } - - class ValueMarshaller extends VariableMarshaller> { - public List readPayload(DataInput dataIn) throws IOException { - List result = new ArrayList(); - int size = dataIn.readInt(); - for (int i = 0; i < size; i++) { - JobLocation jobLocation = new JobLocation(); - jobLocation.readExternal(dataIn); - result.add(jobLocation); - } - return result; - } - - public void writePayload(List value, DataOutput dataOut) throws IOException { - dataOut.writeInt(value.size()); - for (JobLocation jobLocation : value) { - jobLocation.writeExternal(dataOut); - } - } - } - - class JobSchedulerMarshaller extends VariableMarshaller { - private final JobSchedulerStore store; - JobSchedulerMarshaller(JobSchedulerStore store) { - this.store = store; - } - public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { - JobSchedulerImpl result = new JobSchedulerImpl(this.store); - result.read(dataIn); - return result; - } - - public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { - js.write(dataOut); - } - } - - public File getDirectory() { - return directory; - } - - public void setDirectory(File directory) { - this.directory = directory; - } - - public long size() { - if ( !isStarted() ) { - return 0; - } - try { - return journal.getDiskSize() + pageFile.getDiskSize(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public JobScheduler getJobScheduler(final String name) throws Exception { - JobSchedulerImpl result = this.schedulers.get(name); - if (result == null) { - final JobSchedulerImpl js = new JobSchedulerImpl(this); - js.setName(name); - getPageFile().tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - js.createIndexes(tx); - js.load(tx); - metaData.storedSchedulers.put(tx, name, js); - } - }); - result = js; - this.schedulers.put(name, js); - if (isStarted()) { - result.start(); - } - this.pageFile.flush(); - } - return result; - } - - synchronized public boolean removeJobScheduler(final String name) throws Exception { - boolean result = false; - final JobSchedulerImpl js = this.schedulers.remove(name); - result = js != null; - if (result) { - js.stop(); - getPageFile().tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - metaData.storedSchedulers.remove(tx, name); - js.destroy(tx); - } - }); - } - return result; - } - - @Override - protected synchronized void doStart() throws Exception { - if (this.directory == null) { - this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); - } - IOHelper.mkdirs(this.directory); - lock(); - this.journal = new Journal(); - this.journal.setDirectory(directory); - this.journal.setMaxFileLength(getJournalMaxFileLength()); - this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); - this.journal.start(); - this.pageFile = new PageFile(directory, "scheduleDB"); - this.pageFile.setWriteBatchSize(1); - this.pageFile.load(); - - this.pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - if (pageFile.getPageCount() == 0) { - Page page = tx.allocate(); - assert page.getPageId() == 0; - page.set(metaData); - metaData.page = page; - metaData.createIndexes(tx); - tx.store(metaData.page, metaDataMarshaller, true); - - } else { - Page page = tx.load(0, metaDataMarshaller); - metaData = page.get(); - metaData.page = page; - } - metaData.load(tx); - metaData.loadScheduler(tx, schedulers); - for (JobSchedulerImpl js :schedulers.values()) { - try { - js.start(); - } catch (Exception e) { - JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e); - } - } - } - }); - - this.pageFile.flush(); - LOG.info(this + " started"); - } - - @Override - protected synchronized void doStop(ServiceStopper stopper) throws Exception { - for (JobSchedulerImpl js : this.schedulers.values()) { - js.stop(); - } - if (this.pageFile != null) { - this.pageFile.unload(); - } - if (this.journal != null) { - journal.close(); - } - if (this.lockFile != null) { - this.lockFile.unlock(); - } - this.lockFile = null; - LOG.info(this + " stopped"); - - } - - synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException { - int logId = location.getDataFileId(); - Integer val = this.metaData.journalRC.get(tx, logId); - int refCount = val != null ? val.intValue() + 1 : 1; - this.metaData.journalRC.put(tx, logId, refCount); - - } - - synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException { - int logId = location.getDataFileId(); - int refCount = this.metaData.journalRC.get(tx, logId); - refCount--; - if (refCount <= 0) { - this.metaData.journalRC.remove(tx, logId); - Set set = new HashSet(); - set.add(logId); - this.journal.removeDataFiles(set); - } else { - this.metaData.journalRC.put(tx, logId, refCount); - } - - } - - synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException { - ByteSequence result = null; - result = this.journal.read(location); - return result; - } - - synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { - return this.journal.write(payload, sync); - } - - private void lock() throws IOException { - if (lockFile == null) { - File lockFileName = new File(directory, "lock"); - lockFile = new LockFile(lockFileName, true); - if (failIfDatabaseIsLocked) { - lockFile.lock(); - } else { - while (true) { - try { - lockFile.lock(); - break; - } catch (IOException e) { - LOG.info("Database " + lockFileName + " is locked... waiting " - + (DATABASE_LOCKED_WAIT_DELAY / 1000) - + " seconds for the database to be unlocked. Reason: " + e); - try { - Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); - } catch (InterruptedException e1) { - } - } - } - } - } - } - - PageFile getPageFile() { - this.pageFile.isLoaded(); - return this.pageFile; - } - - public boolean isFailIfDatabaseIsLocked() { - return failIfDatabaseIsLocked; - } - - public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { - this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; - } - - public int getJournalMaxFileLength() { - return journalMaxFileLength; - } - - public void setJournalMaxFileLength(int journalMaxFileLength) { - this.journalMaxFileLength = journalMaxFileLength; - } - - public int getJournalMaxWriteBatchSize() { - return journalMaxWriteBatchSize; - } - - public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { - this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; - } - - public boolean isEnableIndexWriteAsync() { - return enableIndexWriteAsync; - } - - public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { - this.enableIndexWriteAsync = enableIndexWriteAsync; - } - - @Override - public String toString() { - return "JobSchedulerStore:" + this.directory; - } + JobScheduler getJobScheduler(String name) throws Exception; + boolean removeJobScheduler(String name) throws Exception; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java new file mode 100644 index 0000000000..b1cc7cf105 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * @author Hiram Chirino + */ +public class JobSupport { + public static String getDateTime(long value) { + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = new Date(value); + return dateFormat.format(date); + } + + public static long getDataTime(String value) throws Exception { + DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date = dfm.parse(value); + return date.getTime(); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java index 0bdcbf2536..af2569f492 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java @@ -268,7 +268,8 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { private JobSchedulerStore getStore() throws Exception { if (started.get()) { if (this.store == null) { - this.store = new JobSchedulerStore(); + String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl"; + this.store = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance(); this.store.setDirectory(directory); this.store.start(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java b/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java index 844d4b32a9..9ca5558acc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java @@ -1,7 +1,6 @@ package org.apache.activemq.store; import org.apache.activemq.Service; -import org.apache.activemq.store.kahadb.plist.PListImpl; import java.io.File; @@ -13,7 +12,7 @@ public interface PListStore extends Service { void setDirectory(File directory); - PListImpl getPList(String name) throws Exception; + PList getPList(String name) throws Exception; boolean removePList(String name) throws Exception; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java similarity index 76% rename from activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java rename to activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java index ec00cdbef0..46c9929190 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java @@ -14,11 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.broker.scheduler; +package org.apache.activemq.store.kahadb.scheduler; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; + +import org.apache.activemq.broker.scheduler.Job; +import org.apache.activemq.broker.scheduler.JobSupport; import org.apache.activemq.util.ByteSequence; @@ -63,27 +66,12 @@ public class JobImpl implements Job { public String getNextExecutionTime() { - return JobImpl.getDateTime(this.jobLocation.getNextTime()); + return JobSupport.getDateTime(this.jobLocation.getNextTime()); } public String getStartTime() { - return JobImpl.getDateTime(getStart()); - } - - public static long getDataTime(String value) throws Exception { - DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - Date date = dfm.parse(value); - return date.getTime(); - } - - public static String getDateTime(long value) { - DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date date = new Date(value); - return dateFormat.format(date); + return JobSupport.getDateTime(getStart()); } - - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java similarity index 99% rename from activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java rename to activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java index c6207d83dd..0acaa7c82b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.broker.scheduler; +package org.apache.activemq.store.kahadb.scheduler; import java.io.DataInput; import java.io.DataOutput; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java similarity index 98% rename from activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java rename to activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index 511d364332..2e37810375 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.broker.scheduler; +package org.apache.activemq.store.kahadb.scheduler; import java.io.DataInput; import java.io.DataOutput; @@ -28,6 +28,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.MessageFormatException; +import org.apache.activemq.broker.scheduler.CronParser; +import org.apache.activemq.broker.scheduler.Job; +import org.apache.activemq.broker.scheduler.JobListener; +import org.apache.activemq.broker.scheduler.JobScheduler; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; @@ -42,7 +46,7 @@ import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler { private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class); - final JobSchedulerStore store; + final JobSchedulerStoreImpl store; private final AtomicBoolean running = new AtomicBoolean(); private String name; BTreeIndex> index; @@ -51,7 +55,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler private static final IdGenerator ID_GENERATOR = new IdGenerator(); private final ScheduleTime scheduleTime = new ScheduleTime(); - JobSchedulerImpl(JobSchedulerStore store) { + JobSchedulerImpl(JobSchedulerStoreImpl store) { this.store = store; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java new file mode 100644 index 0000000000..7811948915 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.scheduler; + +import org.apache.activemq.broker.scheduler.JobScheduler; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; +import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Page; +import org.apache.activemq.store.kahadb.disk.page.PageFile; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller; +import org.apache.activemq.util.LockFile; +import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; +import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore { + static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); + private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; + + public static final int CLOSED_STATE = 1; + public static final int OPEN_STATE = 2; + + private File directory; + PageFile pageFile; + private Journal journal; + private LockFile lockFile; + private boolean failIfDatabaseIsLocked; + private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; + private boolean enableIndexWriteAsync = false; + // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; + MetaData metaData = new MetaData(this); + final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); + Map schedulers = new HashMap(); + + protected class MetaData { + protected MetaData(JobSchedulerStoreImpl store) { + this.store = store; + } + private final JobSchedulerStoreImpl store; + Page page; + BTreeIndex journalRC; + BTreeIndex storedSchedulers; + + void createIndexes(Transaction tx) throws IOException { + this.storedSchedulers = new BTreeIndex(pageFile, tx.allocate().getPageId()); + this.journalRC = new BTreeIndex(pageFile, tx.allocate().getPageId()); + } + + void load(Transaction tx) throws IOException { + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.storedSchedulers.load(tx); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.load(tx); + } + + void loadScheduler(Transaction tx, Map schedulers) throws IOException { + for (Iterator> i = this.storedSchedulers.iterator(tx); i.hasNext();) { + Entry entry = i.next(); + entry.getValue().load(tx); + schedulers.put(entry.getKey(), entry.getValue()); + } + } + + public void read(DataInput is) throws IOException { + this.storedSchedulers = new BTreeIndex(pageFile, is.readLong()); + this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); + this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); + this.journalRC = new BTreeIndex(pageFile, is.readLong()); + this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); + this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); + } + + public void write(DataOutput os) throws IOException { + os.writeLong(this.storedSchedulers.getPageId()); + os.writeLong(this.journalRC.getPageId()); + + } + } + + class MetaDataMarshaller extends VariableMarshaller { + private final JobSchedulerStoreImpl store; + + MetaDataMarshaller(JobSchedulerStoreImpl store) { + this.store = store; + } + public MetaData readPayload(DataInput dataIn) throws IOException { + MetaData rc = new MetaData(this.store); + rc.read(dataIn); + return rc; + } + + public void writePayload(MetaData object, DataOutput dataOut) throws IOException { + object.write(dataOut); + } + } + + class ValueMarshaller extends VariableMarshaller> { + public List readPayload(DataInput dataIn) throws IOException { + List result = new ArrayList(); + int size = dataIn.readInt(); + for (int i = 0; i < size; i++) { + JobLocation jobLocation = new JobLocation(); + jobLocation.readExternal(dataIn); + result.add(jobLocation); + } + return result; + } + + public void writePayload(List value, DataOutput dataOut) throws IOException { + dataOut.writeInt(value.size()); + for (JobLocation jobLocation : value) { + jobLocation.writeExternal(dataOut); + } + } + } + + class JobSchedulerMarshaller extends VariableMarshaller { + private final JobSchedulerStoreImpl store; + JobSchedulerMarshaller(JobSchedulerStoreImpl store) { + this.store = store; + } + public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { + JobSchedulerImpl result = new JobSchedulerImpl(this.store); + result.read(dataIn); + return result; + } + + public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { + js.write(dataOut); + } + } + + @Override + public File getDirectory() { + return directory; + } + + @Override + public void setDirectory(File directory) { + this.directory = directory; + } + + @Override + public long size() { + if ( !isStarted() ) { + return 0; + } + try { + return journal.getDiskSize() + pageFile.getDiskSize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public JobScheduler getJobScheduler(final String name) throws Exception { + JobSchedulerImpl result = this.schedulers.get(name); + if (result == null) { + final JobSchedulerImpl js = new JobSchedulerImpl(this); + js.setName(name); + getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + js.createIndexes(tx); + js.load(tx); + metaData.storedSchedulers.put(tx, name, js); + } + }); + result = js; + this.schedulers.put(name, js); + if (isStarted()) { + result.start(); + } + this.pageFile.flush(); + } + return result; + } + + @Override + synchronized public boolean removeJobScheduler(final String name) throws Exception { + boolean result = false; + final JobSchedulerImpl js = this.schedulers.remove(name); + result = js != null; + if (result) { + js.stop(); + getPageFile().tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + metaData.storedSchedulers.remove(tx, name); + js.destroy(tx); + } + }); + } + return result; + } + + @Override + protected synchronized void doStart() throws Exception { + if (this.directory == null) { + this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); + } + IOHelper.mkdirs(this.directory); + lock(); + this.journal = new Journal(); + this.journal.setDirectory(directory); + this.journal.setMaxFileLength(getJournalMaxFileLength()); + this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); + this.journal.start(); + this.pageFile = new PageFile(directory, "scheduleDB"); + this.pageFile.setWriteBatchSize(1); + this.pageFile.load(); + + this.pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + if (pageFile.getPageCount() == 0) { + Page page = tx.allocate(); + assert page.getPageId() == 0; + page.set(metaData); + metaData.page = page; + metaData.createIndexes(tx); + tx.store(metaData.page, metaDataMarshaller, true); + + } else { + Page page = tx.load(0, metaDataMarshaller); + metaData = page.get(); + metaData.page = page; + } + metaData.load(tx); + metaData.loadScheduler(tx, schedulers); + for (JobSchedulerImpl js :schedulers.values()) { + try { + js.start(); + } catch (Exception e) { + JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(),e); + } + } + } + }); + + this.pageFile.flush(); + LOG.info(this + " started"); + } + + @Override + protected synchronized void doStop(ServiceStopper stopper) throws Exception { + for (JobSchedulerImpl js : this.schedulers.values()) { + js.stop(); + } + if (this.pageFile != null) { + this.pageFile.unload(); + } + if (this.journal != null) { + journal.close(); + } + if (this.lockFile != null) { + this.lockFile.unlock(); + } + this.lockFile = null; + LOG.info(this + " stopped"); + + } + + synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException { + int logId = location.getDataFileId(); + Integer val = this.metaData.journalRC.get(tx, logId); + int refCount = val != null ? val.intValue() + 1 : 1; + this.metaData.journalRC.put(tx, logId, refCount); + + } + + synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException { + int logId = location.getDataFileId(); + int refCount = this.metaData.journalRC.get(tx, logId); + refCount--; + if (refCount <= 0) { + this.metaData.journalRC.remove(tx, logId); + Set set = new HashSet(); + set.add(logId); + this.journal.removeDataFiles(set); + } else { + this.metaData.journalRC.put(tx, logId, refCount); + } + + } + + synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException { + ByteSequence result = null; + result = this.journal.read(location); + return result; + } + + synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { + return this.journal.write(payload, sync); + } + + private void lock() throws IOException { + if (lockFile == null) { + File lockFileName = new File(directory, "lock"); + lockFile = new LockFile(lockFileName, true); + if (failIfDatabaseIsLocked) { + lockFile.lock(); + } else { + while (true) { + try { + lockFile.lock(); + break; + } catch (IOException e) { + LOG.info("Database " + lockFileName + " is locked... waiting " + + (DATABASE_LOCKED_WAIT_DELAY / 1000) + + " seconds for the database to be unlocked. Reason: " + e); + try { + Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); + } catch (InterruptedException e1) { + } + } + } + } + } + } + + PageFile getPageFile() { + this.pageFile.isLoaded(); + return this.pageFile; + } + + public boolean isFailIfDatabaseIsLocked() { + return failIfDatabaseIsLocked; + } + + public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { + this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; + } + + public int getJournalMaxFileLength() { + return journalMaxFileLength; + } + + public void setJournalMaxFileLength(int journalMaxFileLength) { + this.journalMaxFileLength = journalMaxFileLength; + } + + public int getJournalMaxWriteBatchSize() { + return journalMaxWriteBatchSize; + } + + public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { + this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; + } + + public boolean isEnableIndexWriteAsync() { + return enableIndexWriteAsync; + } + + public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { + this.enableIndexWriteAsync = enableIndexWriteAsync; + } + + @Override + public String toString() { + return "JobSchedulerStore:" + this.directory; + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java index 7a72e801ca..c3d6e877fe 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.scheduler; import junit.framework.TestCase; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ByteSequence; @@ -27,7 +28,7 @@ import java.util.List; public class JobSchedulerStoreTest extends TestCase { public void testRestart() throws Exception { - JobSchedulerStore store = new JobSchedulerStore(); + JobSchedulerStore store = new JobSchedulerStoreImpl(); File directory = new File("target/test/ScheduledDB"); IOHelper.mkdirs(directory); IOHelper.deleteChildren(directory); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java index 236390ed5a..f91f49f4e2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.ByteSequence; import org.junit.After; @@ -240,7 +241,7 @@ public class JobSchedulerTest { } protected void startStore(File directory) throws Exception { - store = new JobSchedulerStore(); + store = new JobSchedulerStoreImpl(); store.setDirectory(directory); store.start(); scheduler = store.getJobScheduler("test");