package com.baeldung.batchscheduler; import java.util.Date; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.baeldung.batchscheduler.model.Book; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.support.transaction.ResourcelessTransactionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.ScheduledMethodRunnable; @Configuration @EnableBatchProcessing @EnableScheduling public class SpringBatchScheduler { private final Logger logger = LoggerFactory.getLogger(SpringBatchScheduler.class); private AtomicBoolean enabled = new AtomicBoolean(true); private AtomicInteger batchRunCounter = new AtomicInteger(0); private final Map> scheduledTasks = new IdentityHashMap<>(); @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Scheduled(fixedRate = 2000) public void launchJob() throws Exception { Date date = new Date(); logger.debug("scheduler starts at " + date); if (enabled.get()) { JobExecution jobExecution = jobLauncher().run(job(), new JobParametersBuilder().addDate("launchDate", date) .toJobParameters()); batchRunCounter.incrementAndGet(); logger.debug("Batch job ends with status as " + jobExecution.getStatus()); } logger.debug("scheduler ends "); } public void stop() { enabled.set(false); } public void start() { enabled.set(true); } @Bean public TaskScheduler poolScheduler() { return new CustomTaskScheduler(); } private class CustomTaskScheduler extends ThreadPoolTaskScheduler { private static final long serialVersionUID = -7142624085505040603L; @Override public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) { ScheduledFuture future = super.scheduleAtFixedRate(task, period); ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) task; scheduledTasks.put(runnable.getTarget(), future); return future; } } public void cancelFutureSchedulerTasks() { scheduledTasks.forEach((k, v) -> { if (k instanceof SpringBatchScheduler) { v.cancel(false); } }); } @Bean public Job job() { return jobBuilderFactory.get("job") .start(readBooks()) .build(); } @Bean public JobLauncher jobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); jobLauncher.afterPropertiesSet(); return jobLauncher; } @Bean public JobRepository jobRepository() throws Exception { MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(); factory.setTransactionManager(new ResourcelessTransactionManager()); return (JobRepository) factory.getObject(); } @Bean protected Step readBooks() { return stepBuilderFactory.get("readBooks") . chunk(2) .reader(reader()) .writer(writer()) .build(); } @Bean public FlatFileItemReader reader() { return new FlatFileItemReaderBuilder().name("bookItemReader") .resource(new ClassPathResource("books.csv")) .delimited() .names(new String[] { "id", "name" }) .fieldSetMapper(new BeanWrapperFieldSetMapper() { { setTargetType(Book.class); } }) .build(); } @Bean public ItemWriter writer() { return new ItemWriter() { @Override public void write(List items) throws Exception { logger.debug("writer..." + items.size()); for (Book item : items) { logger.debug(item.toString()); } } }; } public AtomicInteger getBatchRunCounter() { return batchRunCounter; } }