Rework how IDs are calculated for quartz jobs
This commit is contained in:
parent
e6d3897700
commit
6985f3e5f3
|
@ -20,16 +20,23 @@ package ca.uhn.fhir.jpa.sched;
|
||||||
* #L%
|
* #L%
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import org.hl7.fhir.r5.model.InstantType;
|
||||||
|
import org.hl7.fhir.utilities.DateTimeUtil;
|
||||||
import org.quartz.spi.TriggerFiredBundle;
|
import org.quartz.spi.TriggerFiredBundle;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
|
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.context.ApplicationContextAware;
|
import org.springframework.context.ApplicationContextAware;
|
||||||
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
|
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
public class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
|
public class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
|
||||||
|
|
||||||
private transient AutowireCapableBeanFactory myBeanFactory;
|
private transient AutowireCapableBeanFactory myBeanFactory;
|
||||||
private ApplicationContext myAppCtx;
|
private ApplicationContext myAppCtx;
|
||||||
|
private static final Logger ourLog = LoggerFactory.getLogger(AutowiringSpringBeanJobFactory.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setApplicationContext(final ApplicationContext theApplicationContext) {
|
public void setApplicationContext(final ApplicationContext theApplicationContext) {
|
||||||
|
@ -39,6 +46,13 @@ public class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory impleme
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
|
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
|
||||||
|
|
||||||
|
String prev = toString(bundle.getPrevFireTime());
|
||||||
|
String scheduled = toString(bundle.getScheduledFireTime());
|
||||||
|
String next = toString(bundle.getNextFireTime());
|
||||||
|
String fireInstanceId = bundle.getTrigger().getFireInstanceId();
|
||||||
|
ourLog.info("Firing job[{}] ID[{}] - Previous[{}] Scheduled[{}] Next[{}]", bundle.getJobDetail().getKey(), fireInstanceId, prev, scheduled, next);
|
||||||
|
|
||||||
Object job = super.createJobInstance(bundle);
|
Object job = super.createJobInstance(bundle);
|
||||||
myBeanFactory.autowireBean(job);
|
myBeanFactory.autowireBean(job);
|
||||||
if (job instanceof ApplicationContextAware) {
|
if (job instanceof ApplicationContextAware) {
|
||||||
|
@ -46,4 +60,11 @@ public class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory impleme
|
||||||
}
|
}
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String toString(Date theDate) {
|
||||||
|
if (theDate == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new InstantType(theDate).getValueAsString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -156,10 +156,9 @@ public abstract class BaseHapiScheduler implements IHapiScheduler {
|
||||||
Validate.notNull(theJobDefinition.getJobClass());
|
Validate.notNull(theJobDefinition.getJobClass());
|
||||||
Validate.notBlank(theJobDefinition.getId());
|
Validate.notBlank(theJobDefinition.getId());
|
||||||
|
|
||||||
JobKey jobKey;
|
JobKey jobKey = new JobKey(theJobDefinition.getId(), theJobDefinition.getGroup());
|
||||||
|
TriggerKey triggerKey = new TriggerKey(theJobDefinition.getId(), theJobDefinition.getGroup());
|
||||||
jobKey = new JobKey(theJobDefinition.getId(), theJobDefinition.getGroup());
|
|
||||||
|
|
||||||
JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl();
|
JobDetailImpl jobDetail = new NonConcurrentJobDetailImpl();
|
||||||
jobDetail.setJobClass(theJobDefinition.getJobClass());
|
jobDetail.setJobClass(theJobDefinition.getJobClass());
|
||||||
jobDetail.setKey(jobKey);
|
jobDetail.setKey(jobKey);
|
||||||
|
@ -172,6 +171,7 @@ public abstract class BaseHapiScheduler implements IHapiScheduler {
|
||||||
|
|
||||||
Trigger trigger = TriggerBuilder.newTrigger()
|
Trigger trigger = TriggerBuilder.newTrigger()
|
||||||
.forJob(jobDetail)
|
.forJob(jobDetail)
|
||||||
|
.withIdentity(triggerKey)
|
||||||
.startNow()
|
.startNow()
|
||||||
.withSchedule(schedule)
|
.withSchedule(schedule)
|
||||||
.build();
|
.build();
|
||||||
|
|
|
@ -386,7 +386,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
|
||||||
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
|
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
|
||||||
jobDetail.setId(getClass().getName());
|
jobDetail.setId(getClass().getName());
|
||||||
jobDetail.setJobClass(Job.class);
|
jobDetail.setJobClass(Job.class);
|
||||||
mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_SECOND, jobDetail);
|
mySchedulerService.scheduleLocalJob(60 * DateUtils.MILLIS_PER_SECOND, jobDetail);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Job implements HapiJob {
|
public static class Job implements HapiJob {
|
||||||
|
|
|
@ -264,7 +264,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
|
||||||
// Register scheduled job to save deferred concepts
|
// Register scheduled job to save deferred concepts
|
||||||
// In the future it would be great to make this a cluster-aware task somehow
|
// In the future it would be great to make this a cluster-aware task somehow
|
||||||
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
|
ScheduledJobDefinition jobDefinition = new ScheduledJobDefinition();
|
||||||
jobDefinition.setId(this.getClass().getName());
|
jobDefinition.setId(Job.class.getName());
|
||||||
jobDefinition.setJobClass(Job.class);
|
jobDefinition.setJobClass(Job.class);
|
||||||
mySchedulerService.scheduleLocalJob(5000, jobDefinition);
|
mySchedulerService.scheduleLocalJob(5000, jobDefinition);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,12 @@ import org.hl7.fhir.dstu3.model.InstantType;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@ -45,7 +50,14 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
|
||||||
|
|
||||||
private static final int CAPACITY = 1000;
|
private static final int CAPACITY = 1000;
|
||||||
private static final Logger ourLog = LoggerFactory.getLogger(CircularQueueCaptureQueriesListener.class);
|
private static final Logger ourLog = LoggerFactory.getLogger(CircularQueueCaptureQueriesListener.class);
|
||||||
private final Queue<SqlQuery> myQueries = Queues.synchronizedQueue(new CircularFifoQueue<>(CAPACITY));
|
private Queue<SqlQuery> myQueries;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public CircularQueueCaptureQueriesListener() {
|
||||||
|
startCollecting();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Queue<SqlQuery> provideQueryList() {
|
protected Queue<SqlQuery> provideQueryList() {
|
||||||
|
@ -59,6 +71,20 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
|
||||||
myQueries.clear();
|
myQueries.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start collecting queries (this is the default)
|
||||||
|
*/
|
||||||
|
public void startCollecting() {
|
||||||
|
myQueries = Queues.synchronizedQueue(new CircularFifoQueue<>(CAPACITY));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop collecting queries and discard any collected ones
|
||||||
|
*/
|
||||||
|
public void stopCollecting() {
|
||||||
|
myQueries = null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Index 0 is oldest
|
* Index 0 is oldest
|
||||||
*/
|
*/
|
||||||
|
@ -316,7 +342,7 @@ public class CircularQueueCaptureQueriesListener extends BaseCaptureQueriesListe
|
||||||
b.append("\nStack:\n ");
|
b.append("\nStack:\n ");
|
||||||
Stream<String> stackTraceStream = Arrays.stream(theQuery.getStackTrace())
|
Stream<String> stackTraceStream = Arrays.stream(theQuery.getStackTrace())
|
||||||
.map(StackTraceElement::toString)
|
.map(StackTraceElement::toString)
|
||||||
.filter(t->t.startsWith("ca."));
|
.filter(t -> t.startsWith("ca."));
|
||||||
b.append(stackTraceStream.collect(Collectors.joining("\n ")));
|
b.append(stackTraceStream.collect(Collectors.joining("\n ")));
|
||||||
}
|
}
|
||||||
b.append("\n");
|
b.append("\n");
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
package ca.uhn.fhir.jpa.util;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class CircularQueueCaptureQueriesListenerTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStopCollecting() {
|
||||||
|
|
||||||
|
CircularQueueCaptureQueriesListener collector = new CircularQueueCaptureQueriesListener();
|
||||||
|
collector.
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue