BAEL-7185 Access Job Parameters from ItemReader in Spring Batch
- added implementation for batch that track expiring medicine to demonstrate JobParameters usage
This commit is contained in:
parent
870628f031
commit
5027ed7a58
|
@ -48,12 +48,18 @@
|
|||
<version>${awaitility.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>${lombok.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<spring.batch.version>5.0.0</spring.batch.version>
|
||||
<awaitility.version>4.2.0</awaitility.version>
|
||||
<start-class>com.baeldung.batch.SpringBootBatchProcessingApplication</start-class>
|
||||
<lombok.version>1.18.28</lombok.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,78 @@
|
|||
package com.baeldung.batchreaderproperties;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
import org.springframework.batch.core.launch.support.RunIdIncrementer;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import com.baeldung.batchreaderproperties.job.ExpiresSoonMedicineReader;
|
||||
import com.baeldung.batchreaderproperties.job.MedicineProcessor;
|
||||
import com.baeldung.batchreaderproperties.job.MedicineWriter;
|
||||
import com.baeldung.batchreaderproperties.model.Medicine;
|
||||
|
||||
@Configuration
|
||||
public class BatchConfiguration {
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map<String, Object> jobParameters) {
|
||||
|
||||
ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate);
|
||||
enrichWithJobParameters(jobParameters, medicineReader);
|
||||
return medicineReader;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
|
||||
MedicineProcessor medicineProcessor = new MedicineProcessor();
|
||||
enrichWithJobParameters(jobParameters, medicineProcessor);
|
||||
return medicineProcessor;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@StepScope
|
||||
public MedicineWriter medicineWriter(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
|
||||
MedicineWriter medicineWriter = new MedicineWriter();
|
||||
enrichWithJobParameters(jobParameters, medicineWriter);
|
||||
return medicineWriter;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Job medExpirationJob(JobRepository jobRepository, PlatformTransactionManager transactionManager, MedicineWriter medicineWriter, MedicineProcessor medicineProcessor, ExpiresSoonMedicineReader expiresSoonMedicineReader) {
|
||||
Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).<Medicine, Medicine>chunk(10)
|
||||
.reader(expiresSoonMedicineReader)
|
||||
.processor(medicineProcessor)
|
||||
.writer(medicineWriter)
|
||||
.faultTolerant()
|
||||
.transactionManager(transactionManager)
|
||||
.build();
|
||||
|
||||
return new JobBuilder("medExpirationJob", jobRepository).incrementer(new RunIdIncrementer())
|
||||
.start(notifyAboutExpiringMedicine)
|
||||
.build();
|
||||
}
|
||||
|
||||
private void enrichWithJobParameters(Map<String, Object> jobParameters, ContainsJobParameters container) {
|
||||
if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) {
|
||||
container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME)
|
||||
.toString()));
|
||||
}
|
||||
if (jobParameters.get(BatchConstants.TRACE_ID) != null) {
|
||||
container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID)
|
||||
.toString());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package com.baeldung.batchreaderproperties;
|
||||
|
||||
import lombok.experimental.UtilityClass;
|
||||
|
||||
@UtilityClass
|
||||
public class BatchConstants {
|
||||
public static final String TRIGGERED_DATE_TIME = "TRIGGERED_DATE_TIME";
|
||||
public static final String TRACE_ID = "TRACE_ID";
|
||||
public static final String ALERT_TYPE = "ALERT_TYPE";
|
||||
public static final String DEFAULT_EXPIRATION = "DEFAULT_EXPIRATION";
|
||||
public static final String SALE_STARTS_DAYS = "SALE_STARTS_DAYS";
|
||||
public static final String MEDICINE_SALE = "MEDICINE_SALE";
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
package com.baeldung.batchreaderproperties;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
public interface ContainsJobParameters {
|
||||
ZonedDateTime getTriggeredDateTime();
|
||||
String getTraceId();
|
||||
|
||||
void setTriggeredDateTime(ZonedDateTime triggeredDateTime);
|
||||
void setTraceId(String traceId);
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package com.baeldung.batchreaderproperties;
|
||||
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.JobParameters;
|
||||
import org.springframework.batch.core.JobParametersBuilder;
|
||||
import org.springframework.batch.core.launch.JobLauncher;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@EnableScheduling
|
||||
public class MedExpirationBatchRunner {
|
||||
|
||||
@Autowired
|
||||
private Job medExpirationJob;
|
||||
|
||||
@Autowired
|
||||
private JobLauncher jobLauncher;
|
||||
|
||||
@Value("${batch.medicine.alert_type}")
|
||||
private String alertType;
|
||||
|
||||
@Value("${batch.medicine.expiration.default.days}")
|
||||
private long defaultExpiration;
|
||||
|
||||
@Value("${batch.medicine.start.sale.default.days}")
|
||||
private long saleStartDays;
|
||||
|
||||
@Value("${batch.medicine.sale}")
|
||||
private double medicineSale;
|
||||
|
||||
@Scheduled(cron = "${batch.medicine.cron}", zone = "GMT")
|
||||
public void runJob() {
|
||||
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
|
||||
launchJob(now);
|
||||
}
|
||||
|
||||
public void launchJob(ZonedDateTime triggerZonedDateTime) {
|
||||
try {
|
||||
JobParameters jobParameters = new JobParametersBuilder().addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString())
|
||||
.addString(BatchConstants.ALERT_TYPE, alertType)
|
||||
.addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration)
|
||||
.addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays)
|
||||
.addDouble(BatchConstants.MEDICINE_SALE, medicineSale)
|
||||
.addString(BatchConstants.TRACE_ID, UUID.randomUUID()
|
||||
.toString())
|
||||
.toJobParameters();
|
||||
|
||||
jobLauncher.run(medExpirationJob, jobParameters);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to run", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package com.baeldung.batchreaderproperties;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
|
||||
@SpringBootApplication
|
||||
@PropertySource("classpath:disable-job-autorun.properties")
|
||||
public class SpringBatchExpireMedicationApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(SpringBatchExpireMedicationApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
package com.baeldung.batchreaderproperties.job;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.springframework.batch.core.JobParameters;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.annotation.BeforeStep;
|
||||
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
import com.baeldung.batchreaderproperties.ContainsJobParameters;
|
||||
import com.baeldung.batchreaderproperties.model.Medicine;
|
||||
import com.baeldung.batchreaderproperties.model.MedicineCategory;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> implements ContainsJobParameters {
|
||||
|
||||
private static final String FIND_EXPIRING_SOON_MEDICINE = "Select * from MEDICINE where EXPIRATION_DATE >= CURRENT_DATE AND EXPIRATION_DATE <= DATEADD('DAY', ?, CURRENT_DATE)";
|
||||
//common job parameters populated in bean initialization
|
||||
private ZonedDateTime triggeredDateTime;
|
||||
private String traceId;
|
||||
//job parameter injected by Spring
|
||||
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
|
||||
private long defaultExpiration;
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
private List<Medicine> expiringMedicineList;
|
||||
|
||||
@Override
|
||||
protected Medicine doRead() {
|
||||
if (expiringMedicineList != null && !expiringMedicineList.isEmpty()) {
|
||||
return expiringMedicineList.get(getCurrentItemCount() - 1);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doOpen() {
|
||||
expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs));
|
||||
|
||||
log.info("Trace = {}. Found {} meds that expires soon", traceId, expiringMedicineList.size());
|
||||
if (!expiringMedicineList.isEmpty()) {
|
||||
setMaxItemCount(expiringMedicineList.size());
|
||||
}
|
||||
}
|
||||
|
||||
private static Medicine getMedicine(ResultSet rs) throws SQLException {
|
||||
return new Medicine(UUID.fromString(rs.getString(1)), rs.getString(2), MedicineCategory.valueOf(rs.getString(3)), rs.getTimestamp(4), rs.getDouble(5), rs.getObject(6, Double.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
setName(ClassUtils.getShortName(getClass()));
|
||||
}
|
||||
|
||||
@BeforeStep
|
||||
public void beforeStep(StepExecution stepExecution) {
|
||||
JobParameters parameters = stepExecution.getJobExecution()
|
||||
.getJobParameters();
|
||||
log.info("Before step params: {}", parameters);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package com.baeldung.batchreaderproperties.job;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Duration;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
||||
import com.baeldung.batchreaderproperties.ContainsJobParameters;
|
||||
import com.baeldung.batchreaderproperties.model.Medicine;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
@Getter
|
||||
@Setter
|
||||
public class MedicineProcessor implements ItemProcessor<Medicine, Medicine>, ContainsJobParameters {
|
||||
|
||||
private ZonedDateTime triggeredDateTime;
|
||||
private String traceId;
|
||||
|
||||
@Value("#{jobParameters['SALE_STARTS_DAYS']}")
|
||||
private long saleStartsDays;
|
||||
@Value("#{jobParameters['MEDICINE_SALE']}")
|
||||
private double medicineSale;
|
||||
|
||||
@Override
|
||||
public Medicine process(Medicine medicine) {
|
||||
|
||||
final Double originalPrice = medicine.getOriginalPrice();
|
||||
final Timestamp expirationDate = medicine.getExpirationDate();
|
||||
|
||||
Duration daysToExpiration = Duration.between(ZonedDateTime.now(), ZonedDateTime.ofInstant(expirationDate.toInstant(), ZoneId.of("UTC")));
|
||||
|
||||
if (daysToExpiration.toDays() < saleStartsDays) {
|
||||
medicine.setSalePrice(originalPrice * (1 - medicineSale));
|
||||
log.info("Trace = {}, calculated new sale price {} for medicine {}", traceId, medicine.getSalePrice(), medicine.getId());
|
||||
}
|
||||
|
||||
return medicine;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.baeldung.batchreaderproperties.job;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
import org.springframework.batch.item.Chunk;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
|
||||
import com.baeldung.batchreaderproperties.ContainsJobParameters;
|
||||
import com.baeldung.batchreaderproperties.model.Medicine;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@Slf4j
|
||||
public class MedicineWriter implements ItemWriter<Medicine>, ContainsJobParameters {
|
||||
|
||||
private ZonedDateTime triggeredDateTime;
|
||||
private String traceId;
|
||||
|
||||
@Override
|
||||
public void write(Chunk<? extends Medicine> chunk) {
|
||||
chunk.forEach((medicine) -> log.info("Trace = {}. This medicine is expiring {}", traceId, medicine));
|
||||
|
||||
log.info("Finishing job started at {}", triggeredDateTime);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package com.baeldung.batchreaderproperties.model;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.util.UUID;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class Medicine {
|
||||
private UUID id;
|
||||
private String name;
|
||||
private MedicineCategory type;
|
||||
private Timestamp expirationDate;
|
||||
private Double originalPrice;
|
||||
private Double salePrice;
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package com.baeldung.batchreaderproperties.model;
|
||||
|
||||
public enum MedicineCategory {
|
||||
ANESTHETICS, ANTIBACTERIALS, ANTIDEPRESSANTS;
|
||||
}
|
|
@ -1 +1,7 @@
|
|||
file.input=coffee-list.csv
|
||||
##medicine batch related properties
|
||||
batch.medicine.cron=0 */1 * * * *
|
||||
batch.medicine.alert_type=LOGS
|
||||
batch.medicine.expiration.default.days=60
|
||||
batch.medicine.start.sale.default.days=45
|
||||
batch.medicine.sale=0.1
|
|
@ -0,0 +1,4 @@
|
|||
INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null);
|
||||
INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null);
|
||||
INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null);
|
||||
INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null);
|
|
@ -0,0 +1 @@
|
|||
spring.batch.job.enabled=false
|
|
@ -6,3 +6,14 @@ CREATE TABLE coffee (
|
|||
origin VARCHAR(20),
|
||||
characteristics VARCHAR(30)
|
||||
);
|
||||
|
||||
DROP TABLE medicine IF EXISTS;
|
||||
|
||||
CREATE TABLE medicine (
|
||||
med_id VARCHAR(36) PRIMARY KEY,
|
||||
name VARCHAR(30),
|
||||
type VARCHAR(30),
|
||||
expiration_date TIMESTAMP,
|
||||
original_price DECIMAL,
|
||||
sale_price DECIMAL
|
||||
);
|
Loading…
Reference in New Issue