From 5027ed7a5814e8659ce135d421bb60abfe64a976 Mon Sep 17 00:00:00 2001 From: Tetiana Okhotnik Date: Sun, 21 Jan 2024 13:40:16 +0200 Subject: [PATCH 1/2] BAEL-7185 Access Job Parameters from ItemReader in Spring Batch - added implementation for batch that track expiring medicine to demonstrate JobParameters usage --- spring-batch-2/pom.xml | 6 ++ .../BatchConfiguration.java | 78 +++++++++++++++++ .../batchreaderproperties/BatchConstants.java | 13 +++ .../ContainsJobParameters.java | 11 +++ .../MedExpirationBatchRunner.java | 65 ++++++++++++++ ...pringBatchExpireMedicationApplication.java | 15 ++++ .../job/ExpiresSoonMedicineReader.java | 84 +++++++++++++++++++ .../job/MedicineProcessor.java | 46 ++++++++++ .../job/MedicineWriter.java | 29 +++++++ .../batchreaderproperties/model/Medicine.java | 18 ++++ .../model/MedicineCategory.java | 5 ++ .../src/main/resources/application.properties | 8 +- spring-batch-2/src/main/resources/data.sql | 4 + .../resources/disable-job-autorun.properties | 1 + .../src/main/resources/schema-all.sql | 11 +++ 15 files changed, 393 insertions(+), 1 deletion(-) create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConfiguration.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConstants.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/ContainsJobParameters.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/MedExpirationBatchRunner.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/SpringBatchExpireMedicationApplication.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/ExpiresSoonMedicineReader.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/MedicineProcessor.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/MedicineWriter.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/model/Medicine.java create mode 100644 spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/model/MedicineCategory.java create mode 100644 spring-batch-2/src/main/resources/data.sql create mode 100644 spring-batch-2/src/main/resources/disable-job-autorun.properties diff --git a/spring-batch-2/pom.xml b/spring-batch-2/pom.xml index 378191c91c..5b6a012e96 100644 --- a/spring-batch-2/pom.xml +++ b/spring-batch-2/pom.xml @@ -48,12 +48,18 @@ ${awaitility.version} test + + org.projectlombok + lombok + ${lombok.version} + 5.0.0 4.2.0 com.baeldung.batch.SpringBootBatchProcessingApplication + 1.18.28 \ No newline at end of file diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConfiguration.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConfiguration.java new file mode 100644 index 0000000000..8fe567c4ea --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConfiguration.java @@ -0,0 +1,78 @@ +package com.baeldung.batchreaderproperties; + +import java.time.ZonedDateTime; +import java.util.Map; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.PlatformTransactionManager; + +import com.baeldung.batchreaderproperties.job.ExpiresSoonMedicineReader; +import com.baeldung.batchreaderproperties.job.MedicineProcessor; +import com.baeldung.batchreaderproperties.job.MedicineWriter; +import com.baeldung.batchreaderproperties.model.Medicine; + +@Configuration +public class BatchConfiguration { + + @Bean + @StepScope + public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map jobParameters) { + + ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate); + enrichWithJobParameters(jobParameters, medicineReader); + return medicineReader; + } + + @Bean + @StepScope + public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map jobParameters) { + MedicineProcessor medicineProcessor = new MedicineProcessor(); + enrichWithJobParameters(jobParameters, medicineProcessor); + return medicineProcessor; + } + + @Bean + @StepScope + public MedicineWriter medicineWriter(@Value("#{jobParameters}") Map jobParameters) { + MedicineWriter medicineWriter = new MedicineWriter(); + enrichWithJobParameters(jobParameters, medicineWriter); + return medicineWriter; + } + + @Bean + public Job medExpirationJob(JobRepository jobRepository, PlatformTransactionManager transactionManager, MedicineWriter medicineWriter, MedicineProcessor medicineProcessor, ExpiresSoonMedicineReader expiresSoonMedicineReader) { + Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).chunk(10) + .reader(expiresSoonMedicineReader) + .processor(medicineProcessor) + .writer(medicineWriter) + .faultTolerant() + .transactionManager(transactionManager) + .build(); + + return new JobBuilder("medExpirationJob", jobRepository).incrementer(new RunIdIncrementer()) + .start(notifyAboutExpiringMedicine) + .build(); + } + + private void enrichWithJobParameters(Map jobParameters, ContainsJobParameters container) { + if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) { + container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) + .toString())); + } + if (jobParameters.get(BatchConstants.TRACE_ID) != null) { + container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID) + .toString()); + } + } + +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConstants.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConstants.java new file mode 100644 index 0000000000..6ad170c7ad --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConstants.java @@ -0,0 +1,13 @@ +package com.baeldung.batchreaderproperties; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class BatchConstants { + public static final String TRIGGERED_DATE_TIME = "TRIGGERED_DATE_TIME"; + public static final String TRACE_ID = "TRACE_ID"; + public static final String ALERT_TYPE = "ALERT_TYPE"; + public static final String DEFAULT_EXPIRATION = "DEFAULT_EXPIRATION"; + public static final String SALE_STARTS_DAYS = "SALE_STARTS_DAYS"; + public static final String MEDICINE_SALE = "MEDICINE_SALE"; +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/ContainsJobParameters.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/ContainsJobParameters.java new file mode 100644 index 0000000000..517246ca07 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/ContainsJobParameters.java @@ -0,0 +1,11 @@ +package com.baeldung.batchreaderproperties; + +import java.time.ZonedDateTime; + +public interface ContainsJobParameters { + ZonedDateTime getTriggeredDateTime(); + String getTraceId(); + + void setTriggeredDateTime(ZonedDateTime triggeredDateTime); + void setTraceId(String traceId); +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/MedExpirationBatchRunner.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/MedExpirationBatchRunner.java new file mode 100644 index 0000000000..cf031727ba --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/MedExpirationBatchRunner.java @@ -0,0 +1,65 @@ +package com.baeldung.batchreaderproperties; + +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.UUID; + +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@EnableScheduling +public class MedExpirationBatchRunner { + + @Autowired + private Job medExpirationJob; + + @Autowired + private JobLauncher jobLauncher; + + @Value("${batch.medicine.alert_type}") + private String alertType; + + @Value("${batch.medicine.expiration.default.days}") + private long defaultExpiration; + + @Value("${batch.medicine.start.sale.default.days}") + private long saleStartDays; + + @Value("${batch.medicine.sale}") + private double medicineSale; + + @Scheduled(cron = "${batch.medicine.cron}", zone = "GMT") + public void runJob() { + ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + launchJob(now); + } + + public void launchJob(ZonedDateTime triggerZonedDateTime) { + try { + JobParameters jobParameters = new JobParametersBuilder().addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString()) + .addString(BatchConstants.ALERT_TYPE, alertType) + .addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration) + .addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays) + .addDouble(BatchConstants.MEDICINE_SALE, medicineSale) + .addString(BatchConstants.TRACE_ID, UUID.randomUUID() + .toString()) + .toJobParameters(); + + jobLauncher.run(medExpirationJob, jobParameters); + } catch (Exception e) { + log.error("Failed to run", e); + } + } + +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/SpringBatchExpireMedicationApplication.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/SpringBatchExpireMedicationApplication.java new file mode 100644 index 0000000000..f71dc323c7 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/SpringBatchExpireMedicationApplication.java @@ -0,0 +1,15 @@ +package com.baeldung.batchreaderproperties; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.PropertySource; + +@SpringBootApplication +@PropertySource("classpath:disable-job-autorun.properties") +public class SpringBatchExpireMedicationApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringBatchExpireMedicationApplication.class, args); + } + +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/ExpiresSoonMedicineReader.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/ExpiresSoonMedicineReader.java new file mode 100644 index 0000000000..821d054818 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/ExpiresSoonMedicineReader.java @@ -0,0 +1,84 @@ +package com.baeldung.batchreaderproperties.job; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; + +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.util.ClassUtils; + +import com.baeldung.batchreaderproperties.ContainsJobParameters; +import com.baeldung.batchreaderproperties.model.Medicine; +import com.baeldung.batchreaderproperties.model.MedicineCategory; + +import jakarta.annotation.PostConstruct; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Getter +@Setter +@RequiredArgsConstructor +@Slf4j +public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader implements ContainsJobParameters { + + private static final String FIND_EXPIRING_SOON_MEDICINE = "Select * from MEDICINE where EXPIRATION_DATE >= CURRENT_DATE AND EXPIRATION_DATE <= DATEADD('DAY', ?, CURRENT_DATE)"; + //common job parameters populated in bean initialization + private ZonedDateTime triggeredDateTime; + private String traceId; + //job parameter injected by Spring + @Value("#{jobParameters['DEFAULT_EXPIRATION']}") + private long defaultExpiration; + + private final JdbcTemplate jdbcTemplate; + + private List expiringMedicineList; + + @Override + protected Medicine doRead() { + if (expiringMedicineList != null && !expiringMedicineList.isEmpty()) { + return expiringMedicineList.get(getCurrentItemCount() - 1); + } + + return null; + } + + @Override + protected void doOpen() { + expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs)); + + log.info("Trace = {}. Found {} meds that expires soon", traceId, expiringMedicineList.size()); + if (!expiringMedicineList.isEmpty()) { + setMaxItemCount(expiringMedicineList.size()); + } + } + + private static Medicine getMedicine(ResultSet rs) throws SQLException { + return new Medicine(UUID.fromString(rs.getString(1)), rs.getString(2), MedicineCategory.valueOf(rs.getString(3)), rs.getTimestamp(4), rs.getDouble(5), rs.getObject(6, Double.class)); + } + + @Override + protected void doClose() { + + } + + @PostConstruct + public void init() { + setName(ClassUtils.getShortName(getClass())); + } + + @BeforeStep + public void beforeStep(StepExecution stepExecution) { + JobParameters parameters = stepExecution.getJobExecution() + .getJobParameters(); + log.info("Before step params: {}", parameters); + } +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/MedicineProcessor.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/MedicineProcessor.java new file mode 100644 index 0000000000..5e4e853c98 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/MedicineProcessor.java @@ -0,0 +1,46 @@ +package com.baeldung.batchreaderproperties.job; + +import java.sql.Timestamp; +import java.time.Duration; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.beans.factory.annotation.Value; + +import com.baeldung.batchreaderproperties.ContainsJobParameters; +import com.baeldung.batchreaderproperties.model.Medicine; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Setter +public class MedicineProcessor implements ItemProcessor, ContainsJobParameters { + + private ZonedDateTime triggeredDateTime; + private String traceId; + + @Value("#{jobParameters['SALE_STARTS_DAYS']}") + private long saleStartsDays; + @Value("#{jobParameters['MEDICINE_SALE']}") + private double medicineSale; + + @Override + public Medicine process(Medicine medicine) { + + final Double originalPrice = medicine.getOriginalPrice(); + final Timestamp expirationDate = medicine.getExpirationDate(); + + Duration daysToExpiration = Duration.between(ZonedDateTime.now(), ZonedDateTime.ofInstant(expirationDate.toInstant(), ZoneId.of("UTC"))); + + if (daysToExpiration.toDays() < saleStartsDays) { + medicine.setSalePrice(originalPrice * (1 - medicineSale)); + log.info("Trace = {}, calculated new sale price {} for medicine {}", traceId, medicine.getSalePrice(), medicine.getId()); + } + + return medicine; + } +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/MedicineWriter.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/MedicineWriter.java new file mode 100644 index 0000000000..47c1a6a831 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/job/MedicineWriter.java @@ -0,0 +1,29 @@ +package com.baeldung.batchreaderproperties.job; + +import java.time.ZonedDateTime; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; + +import com.baeldung.batchreaderproperties.ContainsJobParameters; +import com.baeldung.batchreaderproperties.model.Medicine; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Getter +@Setter +@Slf4j +public class MedicineWriter implements ItemWriter, ContainsJobParameters { + + private ZonedDateTime triggeredDateTime; + private String traceId; + + @Override + public void write(Chunk chunk) { + chunk.forEach((medicine) -> log.info("Trace = {}. This medicine is expiring {}", traceId, medicine)); + + log.info("Finishing job started at {}", triggeredDateTime); + } +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/model/Medicine.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/model/Medicine.java new file mode 100644 index 0000000000..0d657f8a8b --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/model/Medicine.java @@ -0,0 +1,18 @@ +package com.baeldung.batchreaderproperties.model; + +import java.sql.Timestamp; +import java.util.UUID; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class Medicine { + private UUID id; + private String name; + private MedicineCategory type; + private Timestamp expirationDate; + private Double originalPrice; + private Double salePrice; +} diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/model/MedicineCategory.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/model/MedicineCategory.java new file mode 100644 index 0000000000..6a2f9b50e1 --- /dev/null +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/model/MedicineCategory.java @@ -0,0 +1,5 @@ +package com.baeldung.batchreaderproperties.model; + +public enum MedicineCategory { + ANESTHETICS, ANTIBACTERIALS, ANTIDEPRESSANTS; +} diff --git a/spring-batch-2/src/main/resources/application.properties b/spring-batch-2/src/main/resources/application.properties index 0b8c56d3f8..bbca1d65a2 100644 --- a/spring-batch-2/src/main/resources/application.properties +++ b/spring-batch-2/src/main/resources/application.properties @@ -1 +1,7 @@ -file.input=coffee-list.csv \ No newline at end of file +file.input=coffee-list.csv +##medicine batch related properties +batch.medicine.cron=0 */1 * * * * +batch.medicine.alert_type=LOGS +batch.medicine.expiration.default.days=60 +batch.medicine.start.sale.default.days=45 +batch.medicine.sale=0.1 \ No newline at end of file diff --git a/spring-batch-2/src/main/resources/data.sql b/spring-batch-2/src/main/resources/data.sql new file mode 100644 index 0000000000..4147f20390 --- /dev/null +++ b/spring-batch-2/src/main/resources/data.sql @@ -0,0 +1,4 @@ +INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null); +INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null); +INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null); +INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null); \ No newline at end of file diff --git a/spring-batch-2/src/main/resources/disable-job-autorun.properties b/spring-batch-2/src/main/resources/disable-job-autorun.properties new file mode 100644 index 0000000000..132728afcd --- /dev/null +++ b/spring-batch-2/src/main/resources/disable-job-autorun.properties @@ -0,0 +1 @@ +spring.batch.job.enabled=false \ No newline at end of file diff --git a/spring-batch-2/src/main/resources/schema-all.sql b/spring-batch-2/src/main/resources/schema-all.sql index c17b9f9749..8e04e9a0ed 100644 --- a/spring-batch-2/src/main/resources/schema-all.sql +++ b/spring-batch-2/src/main/resources/schema-all.sql @@ -5,4 +5,15 @@ CREATE TABLE coffee ( brand VARCHAR(20), origin VARCHAR(20), characteristics VARCHAR(30) +); + +DROP TABLE medicine IF EXISTS; + +CREATE TABLE medicine ( + med_id VARCHAR(36) PRIMARY KEY, + name VARCHAR(30), + type VARCHAR(30), + expiration_date TIMESTAMP, + original_price DECIMAL, + sale_price DECIMAL ); \ No newline at end of file From e416f9de2a15568da8b1e40682616ad152e71961 Mon Sep 17 00:00:00 2001 From: Tetiana Okhotnik Date: Tue, 30 Jan 2024 16:31:04 +0200 Subject: [PATCH 2/2] BAEL-7185 Access Job Parameters from ItemReader in Spring Batch - remove @UtilityClass usage --- .../com/baeldung/batchreaderproperties/BatchConstants.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConstants.java b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConstants.java index 6ad170c7ad..ac86bd223a 100644 --- a/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConstants.java +++ b/spring-batch-2/src/main/java/com/baeldung/batchreaderproperties/BatchConstants.java @@ -1,8 +1,5 @@ package com.baeldung.batchreaderproperties; -import lombok.experimental.UtilityClass; - -@UtilityClass public class BatchConstants { public static final String TRIGGERED_DATE_TIME = "TRIGGERED_DATE_TIME"; public static final String TRACE_ID = "TRACE_ID";