mirror of https://github.com/apache/activemq.git
Keep the broker.scheduler package free of kahadb impl specifics.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1406371 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d5dd937b81
commit
0484af1c61
|
@ -1136,11 +1136,11 @@
|
|||
<exclude>org/apache/activemq/xbean/JDBCPersistenceXBeanConfigTest.*</exclude>
|
||||
<exclude>org/apache/activemq/xbean/XBeanXmlTest.*</exclude>
|
||||
<exclude>org/apache/bugs/AMQ1730Test.*</exclude>
|
||||
<exclude>org/apache/bugs/LoadBalanceTest.*</exclude>
|
||||
<exclude>org/apache/kahadb/index/BTreeIndexTest.*</exclude>
|
||||
<exclude>org/apache/kahadb/index/HashIndexTest.*</exclude>
|
||||
<exclude>org/apache/kahadb/index/ListIndexTest.*</exclude>
|
||||
<exclude>org/apache/kahadb/util/DataByteArrayInputStreamTest.*</exclude>
|
||||
<exclude>org/apache/activemq/store/kahadb/bugs/LoadBalanceTest.*</exclude>
|
||||
<exclude>org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.*</exclude>
|
||||
<exclude>org/apache/activemq/store/kahadb/disk/index/HashIndexTest.*</exclude>
|
||||
<exclude>org/apache/activemq/store/kahadb/disk/index/ListIndexTest.*</exclude>
|
||||
<exclude>org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStreamTest.*</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
@ -16,17 +16,14 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.jmx;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import javax.management.openmbean.CompositeType;
|
||||
import javax.management.openmbean.TabularData;
|
||||
import javax.management.openmbean.TabularDataSupport;
|
||||
import javax.management.openmbean.TabularType;
|
||||
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
|
||||
import org.apache.activemq.broker.scheduler.Job;
|
||||
import org.apache.activemq.broker.scheduler.JobImpl;
|
||||
import org.apache.activemq.broker.scheduler.JobScheduler;
|
||||
import org.apache.activemq.broker.scheduler.JobSupport;
|
||||
|
||||
import javax.management.openmbean.*;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class JobSchedulerView implements JobSchedulerViewMBean {
|
||||
|
||||
|
@ -53,8 +50,8 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
|
|||
CompositeType ct = factory.getCompositeType();
|
||||
TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" });
|
||||
TabularDataSupport rc = new TabularDataSupport(tt);
|
||||
long start = JobImpl.getDataTime(startTime);
|
||||
long finish = JobImpl.getDataTime(finishTime);
|
||||
long start = JobSupport.getDataTime(startTime);
|
||||
long finish = JobSupport.getDataTime(finishTime);
|
||||
List<Job> jobs = this.jobScheduler.getAllJobs(start, finish);
|
||||
for (Job job : jobs) {
|
||||
rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
|
||||
|
@ -76,7 +73,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
|
|||
|
||||
public String getNextScheduleTime() throws Exception {
|
||||
long time = this.jobScheduler.getNextScheduleTime();
|
||||
return JobImpl.getDateTime(time);
|
||||
return JobSupport.getDateTime(time);
|
||||
}
|
||||
|
||||
public void removeAllJobs() throws Exception {
|
||||
|
@ -85,8 +82,8 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
|
|||
}
|
||||
|
||||
public void removeAllJobs(String startTime, String finishTime) throws Exception {
|
||||
long start = JobImpl.getDataTime(startTime);
|
||||
long finish = JobImpl.getDataTime(finishTime);
|
||||
long start = JobSupport.getDataTime(startTime);
|
||||
long finish = JobSupport.getDataTime(finishTime);
|
||||
this.jobScheduler.removeAllJobs(start, finish);
|
||||
|
||||
}
|
||||
|
|
|
@ -1,392 +1,20 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.scheduler;
|
||||
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.activemq.util.ServiceSupport;
|
||||
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
|
||||
import org.apache.activemq.store.kahadb.disk.journal.Journal;
|
||||
import org.apache.activemq.store.kahadb.disk.journal.Location;
|
||||
import org.apache.activemq.store.kahadb.disk.page.Page;
|
||||
import org.apache.activemq.store.kahadb.disk.page.PageFile;
|
||||
import org.apache.activemq.store.kahadb.disk.page.Transaction;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
|
||||
import org.apache.activemq.util.LockFile;
|
||||
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
|
||||
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.activemq.Service;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
public class JobSchedulerStore extends ServiceSupport {
|
||||
static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
|
||||
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
|
||||
/**
|
||||
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||
*/
|
||||
public interface JobSchedulerStore extends Service {
|
||||
File getDirectory();
|
||||
|
||||
public static final int CLOSED_STATE = 1;
|
||||
public static final int OPEN_STATE = 2;
|
||||
void setDirectory(File directory);
|
||||
|
||||
private File directory;
|
||||
PageFile pageFile;
|
||||
private Journal journal;
|
||||
private LockFile lockFile;
|
||||
private boolean failIfDatabaseIsLocked;
|
||||
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
|
||||
private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
|
||||
private boolean enableIndexWriteAsync = false;
|
||||
// private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
|
||||
MetaData metaData = new MetaData(this);
|
||||
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
|
||||
Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
|
||||
long size();
|
||||
|
||||
protected class MetaData {
|
||||
protected MetaData(JobSchedulerStore store) {
|
||||
this.store = store;
|
||||
}
|
||||
private final JobSchedulerStore store;
|
||||
Page<MetaData> page;
|
||||
BTreeIndex<Integer, Integer> journalRC;
|
||||
BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
|
||||
|
||||
void createIndexes(Transaction tx) throws IOException {
|
||||
this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
|
||||
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
|
||||
}
|
||||
|
||||
void load(Transaction tx) throws IOException {
|
||||
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
|
||||
this.storedSchedulers.load(tx);
|
||||
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
|
||||
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
|
||||
this.journalRC.load(tx);
|
||||
}
|
||||
|
||||
void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
|
||||
for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
|
||||
Entry<String, JobSchedulerImpl> entry = i.next();
|
||||
entry.getValue().load(tx);
|
||||
schedulers.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void read(DataInput is) throws IOException {
|
||||
this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
|
||||
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
|
||||
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
|
||||
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
|
||||
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
|
||||
}
|
||||
|
||||
public void write(DataOutput os) throws IOException {
|
||||
os.writeLong(this.storedSchedulers.getPageId());
|
||||
os.writeLong(this.journalRC.getPageId());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
class MetaDataMarshaller extends VariableMarshaller<MetaData> {
|
||||
private final JobSchedulerStore store;
|
||||
|
||||
MetaDataMarshaller(JobSchedulerStore store) {
|
||||
this.store = store;
|
||||
}
|
||||
public MetaData readPayload(DataInput dataIn) throws IOException {
|
||||
MetaData rc = new MetaData(this.store);
|
||||
rc.read(dataIn);
|
||||
return rc;
|
||||
}
|
||||
|
||||
public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
|
||||
object.write(dataOut);
|
||||
}
|
||||
}
|
||||
|
||||
class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
|
||||
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
|
||||
List<JobLocation> result = new ArrayList<JobLocation>();
|
||||
int size = dataIn.readInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
JobLocation jobLocation = new JobLocation();
|
||||
jobLocation.readExternal(dataIn);
|
||||
result.add(jobLocation);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
|
||||
dataOut.writeInt(value.size());
|
||||
for (JobLocation jobLocation : value) {
|
||||
jobLocation.writeExternal(dataOut);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
|
||||
private final JobSchedulerStore store;
|
||||
JobSchedulerMarshaller(JobSchedulerStore store) {
|
||||
this.store = store;
|
||||
}
|
||||
public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
|
||||
JobSchedulerImpl result = new JobSchedulerImpl(this.store);
|
||||
result.read(dataIn);
|
||||
return result;
|
||||
}
|
||||
|
||||
public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
|
||||
js.write(dataOut);
|
||||
}
|
||||
}
|
||||
|
||||
public File getDirectory() {
|
||||
return directory;
|
||||
}
|
||||
|
||||
public void setDirectory(File directory) {
|
||||
this.directory = directory;
|
||||
}
|
||||
|
||||
public long size() {
|
||||
if ( !isStarted() ) {
|
||||
return 0;
|
||||
}
|
||||
try {
|
||||
return journal.getDiskSize() + pageFile.getDiskSize();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public JobScheduler getJobScheduler(final String name) throws Exception {
|
||||
JobSchedulerImpl result = this.schedulers.get(name);
|
||||
if (result == null) {
|
||||
final JobSchedulerImpl js = new JobSchedulerImpl(this);
|
||||
js.setName(name);
|
||||
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
||||
public void execute(Transaction tx) throws IOException {
|
||||
js.createIndexes(tx);
|
||||
js.load(tx);
|
||||
metaData.storedSchedulers.put(tx, name, js);
|
||||
}
|
||||
});
|
||||
result = js;
|
||||
this.schedulers.put(name, js);
|
||||
if (isStarted()) {
|
||||
result.start();
|
||||
}
|
||||
this.pageFile.flush();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
synchronized public boolean removeJobScheduler(final String name) throws Exception {
|
||||
boolean result = false;
|
||||
final JobSchedulerImpl js = this.schedulers.remove(name);
|
||||
result = js != null;
|
||||
if (result) {
|
||||
js.stop();
|
||||
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
||||
public void execute(Transaction tx) throws IOException {
|
||||
metaData.storedSchedulers.remove(tx, name);
|
||||
js.destroy(tx);
|
||||
}
|
||||
});
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void doStart() throws Exception {
|
||||
if (this.directory == null) {
|
||||
this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
|
||||
}
|
||||
IOHelper.mkdirs(this.directory);
|
||||
lock();
|
||||
this.journal = new Journal();
|
||||
this.journal.setDirectory(directory);
|
||||
this.journal.setMaxFileLength(getJournalMaxFileLength());
|
||||
this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
|
||||
this.journal.start();
|
||||
this.pageFile = new PageFile(directory, "scheduleDB");
|
||||
this.pageFile.setWriteBatchSize(1);
|
||||
this.pageFile.load();
|
||||
|
||||
this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||
public void execute(Transaction tx) throws IOException {
|
||||
if (pageFile.getPageCount() == 0) {
|
||||
Page<MetaData> page = tx.allocate();
|
||||
assert page.getPageId() == 0;
|
||||
page.set(metaData);
|
||||
metaData.page = page;
|
||||
metaData.createIndexes(tx);
|
||||
tx.store(metaData.page, metaDataMarshaller, true);
|
||||
|
||||
} else {
|
||||
Page<MetaData> page = tx.load(0, metaDataMarshaller);
|
||||
metaData = page.get();
|
||||
metaData.page = page;
|
||||
}
|
||||
metaData.load(tx);
|
||||
metaData.loadScheduler(tx, schedulers);
|
||||
for (JobSchedulerImpl js :schedulers.values()) {
|
||||
try {
|
||||
js.start();
|
||||
} catch (Exception e) {
|
||||
JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.pageFile.flush();
|
||||
LOG.info(this + " started");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
|
||||
for (JobSchedulerImpl js : this.schedulers.values()) {
|
||||
js.stop();
|
||||
}
|
||||
if (this.pageFile != null) {
|
||||
this.pageFile.unload();
|
||||
}
|
||||
if (this.journal != null) {
|
||||
journal.close();
|
||||
}
|
||||
if (this.lockFile != null) {
|
||||
this.lockFile.unlock();
|
||||
}
|
||||
this.lockFile = null;
|
||||
LOG.info(this + " stopped");
|
||||
|
||||
}
|
||||
|
||||
synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
|
||||
int logId = location.getDataFileId();
|
||||
Integer val = this.metaData.journalRC.get(tx, logId);
|
||||
int refCount = val != null ? val.intValue() + 1 : 1;
|
||||
this.metaData.journalRC.put(tx, logId, refCount);
|
||||
|
||||
}
|
||||
|
||||
synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
|
||||
int logId = location.getDataFileId();
|
||||
int refCount = this.metaData.journalRC.get(tx, logId);
|
||||
refCount--;
|
||||
if (refCount <= 0) {
|
||||
this.metaData.journalRC.remove(tx, logId);
|
||||
Set<Integer> set = new HashSet<Integer>();
|
||||
set.add(logId);
|
||||
this.journal.removeDataFiles(set);
|
||||
} else {
|
||||
this.metaData.journalRC.put(tx, logId, refCount);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
|
||||
ByteSequence result = null;
|
||||
result = this.journal.read(location);
|
||||
return result;
|
||||
}
|
||||
|
||||
synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
|
||||
return this.journal.write(payload, sync);
|
||||
}
|
||||
|
||||
private void lock() throws IOException {
|
||||
if (lockFile == null) {
|
||||
File lockFileName = new File(directory, "lock");
|
||||
lockFile = new LockFile(lockFileName, true);
|
||||
if (failIfDatabaseIsLocked) {
|
||||
lockFile.lock();
|
||||
} else {
|
||||
while (true) {
|
||||
try {
|
||||
lockFile.lock();
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
LOG.info("Database " + lockFileName + " is locked... waiting "
|
||||
+ (DATABASE_LOCKED_WAIT_DELAY / 1000)
|
||||
+ " seconds for the database to be unlocked. Reason: " + e);
|
||||
try {
|
||||
Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
|
||||
} catch (InterruptedException e1) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PageFile getPageFile() {
|
||||
this.pageFile.isLoaded();
|
||||
return this.pageFile;
|
||||
}
|
||||
|
||||
public boolean isFailIfDatabaseIsLocked() {
|
||||
return failIfDatabaseIsLocked;
|
||||
}
|
||||
|
||||
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
|
||||
this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
|
||||
}
|
||||
|
||||
public int getJournalMaxFileLength() {
|
||||
return journalMaxFileLength;
|
||||
}
|
||||
|
||||
public void setJournalMaxFileLength(int journalMaxFileLength) {
|
||||
this.journalMaxFileLength = journalMaxFileLength;
|
||||
}
|
||||
|
||||
public int getJournalMaxWriteBatchSize() {
|
||||
return journalMaxWriteBatchSize;
|
||||
}
|
||||
|
||||
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
|
||||
this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
|
||||
}
|
||||
|
||||
public boolean isEnableIndexWriteAsync() {
|
||||
return enableIndexWriteAsync;
|
||||
}
|
||||
|
||||
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
|
||||
this.enableIndexWriteAsync = enableIndexWriteAsync;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "JobSchedulerStore:" + this.directory;
|
||||
}
|
||||
JobScheduler getJobScheduler(String name) throws Exception;
|
||||
|
||||
boolean removeJobScheduler(String name) throws Exception;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.scheduler;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||
*/
|
||||
public class JobSupport {
|
||||
public static String getDateTime(long value) {
|
||||
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
Date date = new Date(value);
|
||||
return dateFormat.format(date);
|
||||
}
|
||||
|
||||
public static long getDataTime(String value) throws Exception {
|
||||
DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
Date date = dfm.parse(value);
|
||||
return date.getTime();
|
||||
}
|
||||
|
||||
}
|
|
@ -268,7 +268,8 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
|
|||
private JobSchedulerStore getStore() throws Exception {
|
||||
if (started.get()) {
|
||||
if (this.store == null) {
|
||||
this.store = new JobSchedulerStore();
|
||||
String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
|
||||
this.store = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
|
||||
this.store.setDirectory(directory);
|
||||
this.store.start();
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package org.apache.activemq.store;
|
||||
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.store.kahadb.plist.PListImpl;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
|
@ -13,7 +12,7 @@ public interface PListStore extends Service {
|
|||
|
||||
void setDirectory(File directory);
|
||||
|
||||
PListImpl getPList(String name) throws Exception;
|
||||
PList getPList(String name) throws Exception;
|
||||
|
||||
boolean removePList(String name) throws Exception;
|
||||
|
||||
|
|
|
@ -14,11 +14,14 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.scheduler;
|
||||
package org.apache.activemq.store.kahadb.scheduler;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
import org.apache.activemq.broker.scheduler.Job;
|
||||
import org.apache.activemq.broker.scheduler.JobSupport;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
|
||||
|
||||
|
@ -63,27 +66,12 @@ public class JobImpl implements Job {
|
|||
|
||||
|
||||
public String getNextExecutionTime() {
|
||||
return JobImpl.getDateTime(this.jobLocation.getNextTime());
|
||||
return JobSupport.getDateTime(this.jobLocation.getNextTime());
|
||||
}
|
||||
|
||||
public String getStartTime() {
|
||||
return JobImpl.getDateTime(getStart());
|
||||
return JobSupport.getDateTime(getStart());
|
||||
}
|
||||
|
||||
public static long getDataTime(String value) throws Exception {
|
||||
DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
Date date = dfm.parse(value);
|
||||
return date.getTime();
|
||||
}
|
||||
|
||||
public static String getDateTime(long value) {
|
||||
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
Date date = new Date(value);
|
||||
return dateFormat.format(date);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.scheduler;
|
||||
package org.apache.activemq.store.kahadb.scheduler;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.scheduler;
|
||||
package org.apache.activemq.store.kahadb.scheduler;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
|
@ -28,6 +28,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
|
||||
import javax.jms.MessageFormatException;
|
||||
|
||||
import org.apache.activemq.broker.scheduler.CronParser;
|
||||
import org.apache.activemq.broker.scheduler.Job;
|
||||
import org.apache.activemq.broker.scheduler.JobListener;
|
||||
import org.apache.activemq.broker.scheduler.JobScheduler;
|
||||
import org.apache.activemq.util.IdGenerator;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.activemq.util.ServiceSupport;
|
||||
|
@ -42,7 +46,7 @@ import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
|
|||
|
||||
class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
|
||||
final JobSchedulerStore store;
|
||||
final JobSchedulerStoreImpl store;
|
||||
private final AtomicBoolean running = new AtomicBoolean();
|
||||
private String name;
|
||||
BTreeIndex<Long, List<JobLocation>> index;
|
||||
|
@ -51,7 +55,7 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler
|
|||
private static final IdGenerator ID_GENERATOR = new IdGenerator();
|
||||
private final ScheduleTime scheduleTime = new ScheduleTime();
|
||||
|
||||
JobSchedulerImpl(JobSchedulerStore store) {
|
||||
JobSchedulerImpl(JobSchedulerStoreImpl store) {
|
||||
|
||||
this.store = store;
|
||||
}
|
|
@ -0,0 +1,399 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store.kahadb.scheduler;
|
||||
|
||||
import org.apache.activemq.broker.scheduler.JobScheduler;
|
||||
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.activemq.util.ServiceSupport;
|
||||
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
|
||||
import org.apache.activemq.store.kahadb.disk.journal.Journal;
|
||||
import org.apache.activemq.store.kahadb.disk.journal.Location;
|
||||
import org.apache.activemq.store.kahadb.disk.page.Page;
|
||||
import org.apache.activemq.store.kahadb.disk.page.PageFile;
|
||||
import org.apache.activemq.store.kahadb.disk.page.Transaction;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
|
||||
import org.apache.activemq.util.LockFile;
|
||||
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
|
||||
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
|
||||
static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
|
||||
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
|
||||
|
||||
public static final int CLOSED_STATE = 1;
|
||||
public static final int OPEN_STATE = 2;
|
||||
|
||||
private File directory;
|
||||
PageFile pageFile;
|
||||
private Journal journal;
|
||||
private LockFile lockFile;
|
||||
private boolean failIfDatabaseIsLocked;
|
||||
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
|
||||
private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
|
||||
private boolean enableIndexWriteAsync = false;
|
||||
// private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
|
||||
MetaData metaData = new MetaData(this);
|
||||
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
|
||||
Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
|
||||
|
||||
protected class MetaData {
|
||||
protected MetaData(JobSchedulerStoreImpl store) {
|
||||
this.store = store;
|
||||
}
|
||||
private final JobSchedulerStoreImpl store;
|
||||
Page<MetaData> page;
|
||||
BTreeIndex<Integer, Integer> journalRC;
|
||||
BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
|
||||
|
||||
void createIndexes(Transaction tx) throws IOException {
|
||||
this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
|
||||
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
|
||||
}
|
||||
|
||||
void load(Transaction tx) throws IOException {
|
||||
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
|
||||
this.storedSchedulers.load(tx);
|
||||
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
|
||||
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
|
||||
this.journalRC.load(tx);
|
||||
}
|
||||
|
||||
void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
|
||||
for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
|
||||
Entry<String, JobSchedulerImpl> entry = i.next();
|
||||
entry.getValue().load(tx);
|
||||
schedulers.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
public void read(DataInput is) throws IOException {
|
||||
this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
|
||||
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
|
||||
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
|
||||
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
|
||||
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
|
||||
}
|
||||
|
||||
public void write(DataOutput os) throws IOException {
|
||||
os.writeLong(this.storedSchedulers.getPageId());
|
||||
os.writeLong(this.journalRC.getPageId());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
class MetaDataMarshaller extends VariableMarshaller<MetaData> {
|
||||
private final JobSchedulerStoreImpl store;
|
||||
|
||||
MetaDataMarshaller(JobSchedulerStoreImpl store) {
|
||||
this.store = store;
|
||||
}
|
||||
public MetaData readPayload(DataInput dataIn) throws IOException {
|
||||
MetaData rc = new MetaData(this.store);
|
||||
rc.read(dataIn);
|
||||
return rc;
|
||||
}
|
||||
|
||||
public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
|
||||
object.write(dataOut);
|
||||
}
|
||||
}
|
||||
|
||||
class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
|
||||
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
|
||||
List<JobLocation> result = new ArrayList<JobLocation>();
|
||||
int size = dataIn.readInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
JobLocation jobLocation = new JobLocation();
|
||||
jobLocation.readExternal(dataIn);
|
||||
result.add(jobLocation);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
|
||||
dataOut.writeInt(value.size());
|
||||
for (JobLocation jobLocation : value) {
|
||||
jobLocation.writeExternal(dataOut);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
|
||||
private final JobSchedulerStoreImpl store;
|
||||
JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
|
||||
this.store = store;
|
||||
}
|
||||
public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
|
||||
JobSchedulerImpl result = new JobSchedulerImpl(this.store);
|
||||
result.read(dataIn);
|
||||
return result;
|
||||
}
|
||||
|
||||
public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
|
||||
js.write(dataOut);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getDirectory() {
|
||||
return directory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDirectory(File directory) {
|
||||
this.directory = directory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size() {
|
||||
if ( !isStarted() ) {
|
||||
return 0;
|
||||
}
|
||||
try {
|
||||
return journal.getDiskSize() + pageFile.getDiskSize();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public JobScheduler getJobScheduler(final String name) throws Exception {
|
||||
JobSchedulerImpl result = this.schedulers.get(name);
|
||||
if (result == null) {
|
||||
final JobSchedulerImpl js = new JobSchedulerImpl(this);
|
||||
js.setName(name);
|
||||
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
||||
public void execute(Transaction tx) throws IOException {
|
||||
js.createIndexes(tx);
|
||||
js.load(tx);
|
||||
metaData.storedSchedulers.put(tx, name, js);
|
||||
}
|
||||
});
|
||||
result = js;
|
||||
this.schedulers.put(name, js);
|
||||
if (isStarted()) {
|
||||
result.start();
|
||||
}
|
||||
this.pageFile.flush();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
synchronized public boolean removeJobScheduler(final String name) throws Exception {
|
||||
boolean result = false;
|
||||
final JobSchedulerImpl js = this.schedulers.remove(name);
|
||||
result = js != null;
|
||||
if (result) {
|
||||
js.stop();
|
||||
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
|
||||
public void execute(Transaction tx) throws IOException {
|
||||
metaData.storedSchedulers.remove(tx, name);
|
||||
js.destroy(tx);
|
||||
}
|
||||
});
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void doStart() throws Exception {
|
||||
if (this.directory == null) {
|
||||
this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
|
||||
}
|
||||
IOHelper.mkdirs(this.directory);
|
||||
lock();
|
||||
this.journal = new Journal();
|
||||
this.journal.setDirectory(directory);
|
||||
this.journal.setMaxFileLength(getJournalMaxFileLength());
|
||||
this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
|
||||
this.journal.start();
|
||||
this.pageFile = new PageFile(directory, "scheduleDB");
|
||||
this.pageFile.setWriteBatchSize(1);
|
||||
this.pageFile.load();
|
||||
|
||||
this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||
public void execute(Transaction tx) throws IOException {
|
||||
if (pageFile.getPageCount() == 0) {
|
||||
Page<MetaData> page = tx.allocate();
|
||||
assert page.getPageId() == 0;
|
||||
page.set(metaData);
|
||||
metaData.page = page;
|
||||
metaData.createIndexes(tx);
|
||||
tx.store(metaData.page, metaDataMarshaller, true);
|
||||
|
||||
} else {
|
||||
Page<MetaData> page = tx.load(0, metaDataMarshaller);
|
||||
metaData = page.get();
|
||||
metaData.page = page;
|
||||
}
|
||||
metaData.load(tx);
|
||||
metaData.loadScheduler(tx, schedulers);
|
||||
for (JobSchedulerImpl js :schedulers.values()) {
|
||||
try {
|
||||
js.start();
|
||||
} catch (Exception e) {
|
||||
JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(),e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.pageFile.flush();
|
||||
LOG.info(this + " started");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
|
||||
for (JobSchedulerImpl js : this.schedulers.values()) {
|
||||
js.stop();
|
||||
}
|
||||
if (this.pageFile != null) {
|
||||
this.pageFile.unload();
|
||||
}
|
||||
if (this.journal != null) {
|
||||
journal.close();
|
||||
}
|
||||
if (this.lockFile != null) {
|
||||
this.lockFile.unlock();
|
||||
}
|
||||
this.lockFile = null;
|
||||
LOG.info(this + " stopped");
|
||||
|
||||
}
|
||||
|
||||
synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
|
||||
int logId = location.getDataFileId();
|
||||
Integer val = this.metaData.journalRC.get(tx, logId);
|
||||
int refCount = val != null ? val.intValue() + 1 : 1;
|
||||
this.metaData.journalRC.put(tx, logId, refCount);
|
||||
|
||||
}
|
||||
|
||||
synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
|
||||
int logId = location.getDataFileId();
|
||||
int refCount = this.metaData.journalRC.get(tx, logId);
|
||||
refCount--;
|
||||
if (refCount <= 0) {
|
||||
this.metaData.journalRC.remove(tx, logId);
|
||||
Set<Integer> set = new HashSet<Integer>();
|
||||
set.add(logId);
|
||||
this.journal.removeDataFiles(set);
|
||||
} else {
|
||||
this.metaData.journalRC.put(tx, logId, refCount);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
|
||||
ByteSequence result = null;
|
||||
result = this.journal.read(location);
|
||||
return result;
|
||||
}
|
||||
|
||||
synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
|
||||
return this.journal.write(payload, sync);
|
||||
}
|
||||
|
||||
private void lock() throws IOException {
|
||||
if (lockFile == null) {
|
||||
File lockFileName = new File(directory, "lock");
|
||||
lockFile = new LockFile(lockFileName, true);
|
||||
if (failIfDatabaseIsLocked) {
|
||||
lockFile.lock();
|
||||
} else {
|
||||
while (true) {
|
||||
try {
|
||||
lockFile.lock();
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
LOG.info("Database " + lockFileName + " is locked... waiting "
|
||||
+ (DATABASE_LOCKED_WAIT_DELAY / 1000)
|
||||
+ " seconds for the database to be unlocked. Reason: " + e);
|
||||
try {
|
||||
Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
|
||||
} catch (InterruptedException e1) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PageFile getPageFile() {
|
||||
this.pageFile.isLoaded();
|
||||
return this.pageFile;
|
||||
}
|
||||
|
||||
public boolean isFailIfDatabaseIsLocked() {
|
||||
return failIfDatabaseIsLocked;
|
||||
}
|
||||
|
||||
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
|
||||
this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
|
||||
}
|
||||
|
||||
public int getJournalMaxFileLength() {
|
||||
return journalMaxFileLength;
|
||||
}
|
||||
|
||||
public void setJournalMaxFileLength(int journalMaxFileLength) {
|
||||
this.journalMaxFileLength = journalMaxFileLength;
|
||||
}
|
||||
|
||||
public int getJournalMaxWriteBatchSize() {
|
||||
return journalMaxWriteBatchSize;
|
||||
}
|
||||
|
||||
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
|
||||
this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
|
||||
}
|
||||
|
||||
public boolean isEnableIndexWriteAsync() {
|
||||
return enableIndexWriteAsync;
|
||||
}
|
||||
|
||||
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
|
||||
this.enableIndexWriteAsync = enableIndexWriteAsync;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "JobSchedulerStore:" + this.directory;
|
||||
}
|
||||
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.broker.scheduler;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
|
||||
|
@ -27,7 +28,7 @@ import java.util.List;
|
|||
public class JobSchedulerStoreTest extends TestCase {
|
||||
|
||||
public void testRestart() throws Exception {
|
||||
JobSchedulerStore store = new JobSchedulerStore();
|
||||
JobSchedulerStore store = new JobSchedulerStoreImpl();
|
||||
File directory = new File("target/test/ScheduledDB");
|
||||
IOHelper.mkdirs(directory);
|
||||
IOHelper.deleteChildren(directory);
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.junit.After;
|
||||
|
@ -240,7 +241,7 @@ public class JobSchedulerTest {
|
|||
}
|
||||
|
||||
protected void startStore(File directory) throws Exception {
|
||||
store = new JobSchedulerStore();
|
||||
store = new JobSchedulerStoreImpl();
|
||||
store.setDirectory(directory);
|
||||
store.start();
|
||||
scheduler = store.getJobScheduler("test");
|
||||
|
|
Loading…
Reference in New Issue