Merge pull request #15701 from TanyaOhotnik/BAEL-7185-new-master
BAEL-7185 Access Job Parameters from ItemReader in Spring Batch
This commit is contained in:
commit
afff6d37da
@ -48,12 +48,18 @@
|
|||||||
<version>${awaitility.version}</version>
|
<version>${awaitility.version}</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>${lombok.version}</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<spring.batch.version>5.0.0</spring.batch.version>
|
<spring.batch.version>5.0.0</spring.batch.version>
|
||||||
<awaitility.version>4.2.0</awaitility.version>
|
<awaitility.version>4.2.0</awaitility.version>
|
||||||
<start-class>com.baeldung.batch.SpringBootBatchProcessingApplication</start-class>
|
<start-class>com.baeldung.batch.SpringBootBatchProcessingApplication</start-class>
|
||||||
|
<lombok.version>1.18.28</lombok.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
</project>
|
</project>
|
@ -0,0 +1,78 @@
|
|||||||
|
package com.baeldung.batchreaderproperties;
|
||||||
|
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.Step;
|
||||||
|
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||||
|
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||||
|
import org.springframework.batch.core.launch.support.RunIdIncrementer;
|
||||||
|
import org.springframework.batch.core.repository.JobRepository;
|
||||||
|
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
|
||||||
|
import com.baeldung.batchreaderproperties.job.ExpiresSoonMedicineReader;
|
||||||
|
import com.baeldung.batchreaderproperties.job.MedicineProcessor;
|
||||||
|
import com.baeldung.batchreaderproperties.job.MedicineWriter;
|
||||||
|
import com.baeldung.batchreaderproperties.model.Medicine;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class BatchConfiguration {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@StepScope
|
||||||
|
public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map<String, Object> jobParameters) {
|
||||||
|
|
||||||
|
ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate);
|
||||||
|
enrichWithJobParameters(jobParameters, medicineReader);
|
||||||
|
return medicineReader;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@StepScope
|
||||||
|
public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
|
||||||
|
MedicineProcessor medicineProcessor = new MedicineProcessor();
|
||||||
|
enrichWithJobParameters(jobParameters, medicineProcessor);
|
||||||
|
return medicineProcessor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@StepScope
|
||||||
|
public MedicineWriter medicineWriter(@Value("#{jobParameters}") Map<String, Object> jobParameters) {
|
||||||
|
MedicineWriter medicineWriter = new MedicineWriter();
|
||||||
|
enrichWithJobParameters(jobParameters, medicineWriter);
|
||||||
|
return medicineWriter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Job medExpirationJob(JobRepository jobRepository, PlatformTransactionManager transactionManager, MedicineWriter medicineWriter, MedicineProcessor medicineProcessor, ExpiresSoonMedicineReader expiresSoonMedicineReader) {
|
||||||
|
Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).<Medicine, Medicine>chunk(10)
|
||||||
|
.reader(expiresSoonMedicineReader)
|
||||||
|
.processor(medicineProcessor)
|
||||||
|
.writer(medicineWriter)
|
||||||
|
.faultTolerant()
|
||||||
|
.transactionManager(transactionManager)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return new JobBuilder("medExpirationJob", jobRepository).incrementer(new RunIdIncrementer())
|
||||||
|
.start(notifyAboutExpiringMedicine)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void enrichWithJobParameters(Map<String, Object> jobParameters, ContainsJobParameters container) {
|
||||||
|
if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) {
|
||||||
|
container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME)
|
||||||
|
.toString()));
|
||||||
|
}
|
||||||
|
if (jobParameters.get(BatchConstants.TRACE_ID) != null) {
|
||||||
|
container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID)
|
||||||
|
.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,10 @@
|
|||||||
|
package com.baeldung.batchreaderproperties;
|
||||||
|
|
||||||
|
public class BatchConstants {
|
||||||
|
public static final String TRIGGERED_DATE_TIME = "TRIGGERED_DATE_TIME";
|
||||||
|
public static final String TRACE_ID = "TRACE_ID";
|
||||||
|
public static final String ALERT_TYPE = "ALERT_TYPE";
|
||||||
|
public static final String DEFAULT_EXPIRATION = "DEFAULT_EXPIRATION";
|
||||||
|
public static final String SALE_STARTS_DAYS = "SALE_STARTS_DAYS";
|
||||||
|
public static final String MEDICINE_SALE = "MEDICINE_SALE";
|
||||||
|
}
|
@ -0,0 +1,11 @@
|
|||||||
|
package com.baeldung.batchreaderproperties;
|
||||||
|
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
|
|
||||||
|
public interface ContainsJobParameters {
|
||||||
|
ZonedDateTime getTriggeredDateTime();
|
||||||
|
String getTraceId();
|
||||||
|
|
||||||
|
void setTriggeredDateTime(ZonedDateTime triggeredDateTime);
|
||||||
|
void setTraceId(String traceId);
|
||||||
|
}
|
@ -0,0 +1,65 @@
|
|||||||
|
package com.baeldung.batchreaderproperties;
|
||||||
|
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.JobParameters;
|
||||||
|
import org.springframework.batch.core.JobParametersBuilder;
|
||||||
|
import org.springframework.batch.core.launch.JobLauncher;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@EnableScheduling
|
||||||
|
public class MedExpirationBatchRunner {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private Job medExpirationJob;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private JobLauncher jobLauncher;
|
||||||
|
|
||||||
|
@Value("${batch.medicine.alert_type}")
|
||||||
|
private String alertType;
|
||||||
|
|
||||||
|
@Value("${batch.medicine.expiration.default.days}")
|
||||||
|
private long defaultExpiration;
|
||||||
|
|
||||||
|
@Value("${batch.medicine.start.sale.default.days}")
|
||||||
|
private long saleStartDays;
|
||||||
|
|
||||||
|
@Value("${batch.medicine.sale}")
|
||||||
|
private double medicineSale;
|
||||||
|
|
||||||
|
@Scheduled(cron = "${batch.medicine.cron}", zone = "GMT")
|
||||||
|
public void runJob() {
|
||||||
|
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
|
||||||
|
launchJob(now);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void launchJob(ZonedDateTime triggerZonedDateTime) {
|
||||||
|
try {
|
||||||
|
JobParameters jobParameters = new JobParametersBuilder().addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString())
|
||||||
|
.addString(BatchConstants.ALERT_TYPE, alertType)
|
||||||
|
.addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration)
|
||||||
|
.addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays)
|
||||||
|
.addDouble(BatchConstants.MEDICINE_SALE, medicineSale)
|
||||||
|
.addString(BatchConstants.TRACE_ID, UUID.randomUUID()
|
||||||
|
.toString())
|
||||||
|
.toJobParameters();
|
||||||
|
|
||||||
|
jobLauncher.run(medExpirationJob, jobParameters);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to run", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,15 @@
|
|||||||
|
package com.baeldung.batchreaderproperties;
|
||||||
|
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.context.annotation.PropertySource;
|
||||||
|
|
||||||
|
@SpringBootApplication
|
||||||
|
@PropertySource("classpath:disable-job-autorun.properties")
|
||||||
|
public class SpringBatchExpireMedicationApplication {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication.run(SpringBatchExpireMedicationApplication.class, args);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,84 @@
|
|||||||
|
package com.baeldung.batchreaderproperties.job;
|
||||||
|
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.springframework.batch.core.JobParameters;
|
||||||
|
import org.springframework.batch.core.StepExecution;
|
||||||
|
import org.springframework.batch.core.annotation.BeforeStep;
|
||||||
|
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.util.ClassUtils;
|
||||||
|
|
||||||
|
import com.baeldung.batchreaderproperties.ContainsJobParameters;
|
||||||
|
import com.baeldung.batchreaderproperties.model.Medicine;
|
||||||
|
import com.baeldung.batchreaderproperties.model.MedicineCategory;
|
||||||
|
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.Setter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> implements ContainsJobParameters {
|
||||||
|
|
||||||
|
private static final String FIND_EXPIRING_SOON_MEDICINE = "Select * from MEDICINE where EXPIRATION_DATE >= CURRENT_DATE AND EXPIRATION_DATE <= DATEADD('DAY', ?, CURRENT_DATE)";
|
||||||
|
//common job parameters populated in bean initialization
|
||||||
|
private ZonedDateTime triggeredDateTime;
|
||||||
|
private String traceId;
|
||||||
|
//job parameter injected by Spring
|
||||||
|
@Value("#{jobParameters['DEFAULT_EXPIRATION']}")
|
||||||
|
private long defaultExpiration;
|
||||||
|
|
||||||
|
private final JdbcTemplate jdbcTemplate;
|
||||||
|
|
||||||
|
private List<Medicine> expiringMedicineList;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Medicine doRead() {
|
||||||
|
if (expiringMedicineList != null && !expiringMedicineList.isEmpty()) {
|
||||||
|
return expiringMedicineList.get(getCurrentItemCount() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doOpen() {
|
||||||
|
expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs));
|
||||||
|
|
||||||
|
log.info("Trace = {}. Found {} meds that expires soon", traceId, expiringMedicineList.size());
|
||||||
|
if (!expiringMedicineList.isEmpty()) {
|
||||||
|
setMaxItemCount(expiringMedicineList.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Medicine getMedicine(ResultSet rs) throws SQLException {
|
||||||
|
return new Medicine(UUID.fromString(rs.getString(1)), rs.getString(2), MedicineCategory.valueOf(rs.getString(3)), rs.getTimestamp(4), rs.getDouble(5), rs.getObject(6, Double.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doClose() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
setName(ClassUtils.getShortName(getClass()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeStep
|
||||||
|
public void beforeStep(StepExecution stepExecution) {
|
||||||
|
JobParameters parameters = stepExecution.getJobExecution()
|
||||||
|
.getJobParameters();
|
||||||
|
log.info("Before step params: {}", parameters);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,46 @@
|
|||||||
|
package com.baeldung.batchreaderproperties.job;
|
||||||
|
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
|
|
||||||
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
|
||||||
|
import com.baeldung.batchreaderproperties.ContainsJobParameters;
|
||||||
|
import com.baeldung.batchreaderproperties.model.Medicine;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
public class MedicineProcessor implements ItemProcessor<Medicine, Medicine>, ContainsJobParameters {
|
||||||
|
|
||||||
|
private ZonedDateTime triggeredDateTime;
|
||||||
|
private String traceId;
|
||||||
|
|
||||||
|
@Value("#{jobParameters['SALE_STARTS_DAYS']}")
|
||||||
|
private long saleStartsDays;
|
||||||
|
@Value("#{jobParameters['MEDICINE_SALE']}")
|
||||||
|
private double medicineSale;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Medicine process(Medicine medicine) {
|
||||||
|
|
||||||
|
final Double originalPrice = medicine.getOriginalPrice();
|
||||||
|
final Timestamp expirationDate = medicine.getExpirationDate();
|
||||||
|
|
||||||
|
Duration daysToExpiration = Duration.between(ZonedDateTime.now(), ZonedDateTime.ofInstant(expirationDate.toInstant(), ZoneId.of("UTC")));
|
||||||
|
|
||||||
|
if (daysToExpiration.toDays() < saleStartsDays) {
|
||||||
|
medicine.setSalePrice(originalPrice * (1 - medicineSale));
|
||||||
|
log.info("Trace = {}, calculated new sale price {} for medicine {}", traceId, medicine.getSalePrice(), medicine.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
return medicine;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,29 @@
|
|||||||
|
package com.baeldung.batchreaderproperties.job;
|
||||||
|
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
|
|
||||||
|
import org.springframework.batch.item.Chunk;
|
||||||
|
import org.springframework.batch.item.ItemWriter;
|
||||||
|
|
||||||
|
import com.baeldung.batchreaderproperties.ContainsJobParameters;
|
||||||
|
import com.baeldung.batchreaderproperties.model.Medicine;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@Slf4j
|
||||||
|
public class MedicineWriter implements ItemWriter<Medicine>, ContainsJobParameters {
|
||||||
|
|
||||||
|
private ZonedDateTime triggeredDateTime;
|
||||||
|
private String traceId;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(Chunk<? extends Medicine> chunk) {
|
||||||
|
chunk.forEach((medicine) -> log.info("Trace = {}. This medicine is expiring {}", traceId, medicine));
|
||||||
|
|
||||||
|
log.info("Finishing job started at {}", triggeredDateTime);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,18 @@
|
|||||||
|
package com.baeldung.batchreaderproperties.model;
|
||||||
|
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class Medicine {
|
||||||
|
private UUID id;
|
||||||
|
private String name;
|
||||||
|
private MedicineCategory type;
|
||||||
|
private Timestamp expirationDate;
|
||||||
|
private Double originalPrice;
|
||||||
|
private Double salePrice;
|
||||||
|
}
|
@ -0,0 +1,5 @@
|
|||||||
|
package com.baeldung.batchreaderproperties.model;
|
||||||
|
|
||||||
|
public enum MedicineCategory {
|
||||||
|
ANESTHETICS, ANTIBACTERIALS, ANTIDEPRESSANTS;
|
||||||
|
}
|
@ -1 +1,7 @@
|
|||||||
file.input=coffee-list.csv
|
file.input=coffee-list.csv
|
||||||
|
##medicine batch related properties
|
||||||
|
batch.medicine.cron=0 */1 * * * *
|
||||||
|
batch.medicine.alert_type=LOGS
|
||||||
|
batch.medicine.expiration.default.days=60
|
||||||
|
batch.medicine.start.sale.default.days=45
|
||||||
|
batch.medicine.sale=0.1
|
4
spring-batch-2/src/main/resources/data.sql
Normal file
4
spring-batch-2/src/main/resources/data.sql
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null);
|
||||||
|
INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null);
|
||||||
|
INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null);
|
||||||
|
INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null);
|
@ -0,0 +1 @@
|
|||||||
|
spring.batch.job.enabled=false
|
@ -6,3 +6,14 @@ CREATE TABLE coffee (
|
|||||||
origin VARCHAR(20),
|
origin VARCHAR(20),
|
||||||
characteristics VARCHAR(30)
|
characteristics VARCHAR(30)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
DROP TABLE medicine IF EXISTS;
|
||||||
|
|
||||||
|
CREATE TABLE medicine (
|
||||||
|
med_id VARCHAR(36) PRIMARY KEY,
|
||||||
|
name VARCHAR(30),
|
||||||
|
type VARCHAR(30),
|
||||||
|
expiration_date TIMESTAMP,
|
||||||
|
original_price DECIMAL,
|
||||||
|
sale_price DECIMAL
|
||||||
|
);
|
Loading…
x
Reference in New Issue
Block a user