package com.baeldung.batchscheduler; import com.baeldung.batchscheduler.model.Book; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.support.transaction.ResourcelessTransactionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.jdbc.datasource.DriverManagerDataSource; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.ScheduledMethodRunnable; import javax.sql.DataSource; import java.util.Date; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @Configuration @EnableBatchProcessing @EnableScheduling public class SpringBatchScheduler { private final Logger logger = LoggerFactory.getLogger(SpringBatchScheduler.class); private AtomicBoolean enabled = new AtomicBoolean(true); private AtomicInteger batchRunCounter = new AtomicInteger(0); private final Map> scheduledTasks = new IdentityHashMap<>(); @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private JobLauncher jobLauncher; @Scheduled(fixedRate = 2000) public void launchJob() throws Exception { Date date = new Date(); logger.debug("scheduler starts at " + date); if (enabled.get()) { JobExecution jobExecution = jobLauncher.run(job(), new JobParametersBuilder().addDate("launchDate", date) .toJobParameters()); batchRunCounter.incrementAndGet(); logger.debug("Batch job ends with status as " + jobExecution.getStatus()); } logger.debug("scheduler ends "); } public void stop() { enabled.set(false); } public void start() { enabled.set(true); } @Bean public TaskScheduler poolScheduler() { return new CustomTaskScheduler(); } private class CustomTaskScheduler extends ThreadPoolTaskScheduler { private static final long serialVersionUID = -7142624085505040603L; @Override public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) { ScheduledFuture future = super.scheduleAtFixedRate(task, period); ScheduledMethodRunnable runnable = (ScheduledMethodRunnable) task; scheduledTasks.put(runnable.getTarget(), future); return future; } } public void cancelFutureSchedulerTasks() { scheduledTasks.forEach((k, v) -> { if (k instanceof SpringBatchScheduler) { v.cancel(false); } }); } @Bean public Job job() { return jobBuilderFactory.get("job") .start(readBooks()) .build(); } // @Bean // public JobLauncher jobLauncher() throws Exception { // SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); // jobLauncher.setJobRepository(jobRepository()); // jobLauncher.afterPropertiesSet(); // return jobLauncher; // } // @Bean // public JobRepository jobRepository() throws Exception { // JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); // factory.setDataSource(dataSource()); // factory.setDatabaseType("HSQL"); // factory.setTransactionManager(new ResourcelessTransactionManager()); // return factory.getObject(); // } // @Bean // public DataSource dataSource() { // DriverManagerDataSource dataSource = new DriverManagerDataSource(); // dataSource.setDriverClassName("org.hsqldb.jdbc.JDBCDriver"); // dataSource.setUrl("jdbc:hsqldb:mem:testdb"); // dataSource.setUsername("sa"); // dataSource.setPassword(""); // return dataSource; // } @Bean protected Step readBooks() { return stepBuilderFactory.get("readBooks") . chunk(2) .reader(reader()) .writer(writer()) .build(); } @Bean public FlatFileItemReader reader() { return new FlatFileItemReaderBuilder().name("bookItemReader") .resource(new ClassPathResource("books.csv")) .delimited() .names(new String[] { "id", "name" }) .fieldSetMapper(new BeanWrapperFieldSetMapper() { { setTargetType(Book.class); } }) .build(); } @Bean public ItemWriter writer() { return new ItemWriter() { @Override public void write(List items) throws Exception { logger.debug("writer..." + items.size()); for (Book item : items) { logger.debug(item.toString()); } } }; } public AtomicInteger getBatchRunCounter() { return batchRunCounter; } }