how to trigger and stop scheduled batch job (#4671)

This commit is contained in:
mmchsusan 2018-07-10 02:09:31 -04:00 committed by maibin
parent ca1a1d70fa
commit 0ad3423219
5 changed files with 275 additions and 0 deletions

View File

@ -0,0 +1,172 @@
package org.baeldung.batchscheduler;
import java.util.Date;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.baeldung.batchscheduler.model.Book;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.ScheduledMethodRunnable;
@Configuration
@EnableBatchProcessing
@EnableScheduling
public class SpringBatchScheduler {
private final Logger logger = LoggerFactory.getLogger(SpringBatchScheduler.class);
private AtomicBoolean enabled = new AtomicBoolean(true);
private Date currentLaunchDate;
private final Map<Object, ScheduledFuture<?>> scheduledTasks = new IdentityHashMap<>();
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Scheduled(fixedRate = 2000)
public void launchJob() throws Exception {
Date date = new Date();
logger.debug("scheduler starts at " + date);
if (enabled.get()) {
currentLaunchDate = date;
JobExecution jobExecution = jobLauncher().run(job(), new JobParametersBuilder().addDate("launchDate", currentLaunchDate)
.toJobParameters());
logger.debug("Batch job ends with status as " + jobExecution.getStatus());
}
logger.debug("scheduler ends ");
}
public Date getCurrentLaunchDate() {
return currentLaunchDate;
}
public void stop() {
enabled.set(false);
}
public void start() {
enabled.set(true);
}
@Bean
public TaskScheduler poolScheduler() {
return new CustomTaskScheduler();
}
private class CustomTaskScheduler extends ThreadPoolTaskScheduler {
private static final long serialVersionUID = -7142624085505040603L;
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
ScheduledFuture<?> future = super.scheduleAtFixedRate(task, period);
ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) task;
scheduledTasks.put(runnable.getTarget(), future);
return future;
}
}
public void cancelFutureSchedulerTasks() {
scheduledTasks.forEach((k, v) -> {
if (k instanceof SpringBatchScheduler) {
v.cancel(false);
}
});
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(readBooks())
.build();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobRepository jobRepository() throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
factory.setTransactionManager(new ResourcelessTransactionManager());
return (JobRepository) factory.getObject();
}
@Bean
protected Step readBooks() {
return stepBuilderFactory.get("readBooks")
.<Book, Book> chunk(2)
.reader(reader())
.writer(writer())
.build();
}
@Bean
public FlatFileItemReader<Book> reader() {
return new FlatFileItemReaderBuilder<Book>().name("bookItemReader")
.resource(new ClassPathResource("books.csv"))
.delimited()
.names(new String[] { "id", "name" })
.fieldSetMapper(new BeanWrapperFieldSetMapper<Book>() {
{
setTargetType(Book.class);
}
})
.build();
}
@Bean
public ItemWriter<Book> writer() {
return new ItemWriter<Book>() {
@Override
public void write(List<? extends Book> items) throws Exception {
logger.debug("writer..." + items.size());
for (Book item : items) {
logger.debug(item.toString());
}
}
};
}
}

View File

@ -0,0 +1,35 @@
package org.baeldung.batchscheduler.model;
public class Book {
private int id;
private String name;
public Book() {}
public Book(int id, String name) {
super();
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return "Book [id=" + id + ", name=" + name + "]";
}
}

View File

@ -0,0 +1,4 @@
1,SHARP OBJECTS (MOVIE TIE-IN): A NOVEL
2,ARTEMIS: A NOVEL
3,HER PRETTY FACE
4,ALL WE EVER WANTED
1 1 SHARP OBJECTS (MOVIE TIE-IN): A NOVEL
2 2 ARTEMIS: A NOVEL
3 3 HER PRETTY FACE
4 4 ALL WE EVER WANTED

View File

@ -12,6 +12,10 @@
additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.baeldung.batchscheduler" level="debug" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<root level="error">
<appender-ref ref="STDOUT" />

View File

@ -0,0 +1,60 @@
package org.baeldung.batchscheduler;
import java.util.Date;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringBatchScheduler.class)
public class SpringBatchSchedulerIntegrationTest {
private static final int TIMER = 3000;
@Autowired
private ApplicationContext context;
@Test
public void stopJobsWhenSchedulerDisabled() throws Exception {
Thread.sleep(TIMER);
SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class);
schedulerBean.stop();
Thread.sleep(TIMER);
Date lastLaunchDate = schedulerBean.getCurrentLaunchDate();
Thread.sleep(TIMER);
Assert.assertEquals(lastLaunchDate, schedulerBean.getCurrentLaunchDate());
}
@Test
public void stopJobSchedulerWhenSchedulerDestroyed() throws Exception {
Thread.sleep(TIMER);
ScheduledAnnotationBeanPostProcessor bean = context.getBean(ScheduledAnnotationBeanPostProcessor.class);
SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class);
bean.postProcessBeforeDestruction(schedulerBean, "SpringBatchScheduler");
Thread.sleep(TIMER);
Date lastLaunchTime = schedulerBean.getCurrentLaunchDate();
Thread.sleep(TIMER);
Assert.assertEquals(lastLaunchTime, schedulerBean.getCurrentLaunchDate());
}
@Test
public void stopJobSchedulerWhenFutureTasksCancelled() throws Exception {
Thread.sleep(TIMER);
SpringBatchScheduler schedulerBean = context.getBean(SpringBatchScheduler.class);
schedulerBean.cancelFutureSchedulerTasks();
Thread.sleep(TIMER);
Date lastLaunchTime = schedulerBean.getCurrentLaunchDate();
Thread.sleep(TIMER);
Assert.assertEquals(lastLaunchTime, schedulerBean.getCurrentLaunchDate());
}
}