Add invalid parameter validator
This commit is contained in:
parent
a25a1064f8
commit
b1d2ab7619
|
@ -4,6 +4,7 @@ import ca.uhn.fhir.jpa.batch.processors.PidToIBaseResourceProcessor;
|
||||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||||
import org.springframework.batch.core.Job;
|
import org.springframework.batch.core.Job;
|
||||||
|
import org.springframework.batch.core.JobParametersValidator;
|
||||||
import org.springframework.batch.core.Step;
|
import org.springframework.batch.core.Step;
|
||||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||||
import org.springframework.batch.core.configuration.annotation.JobScope;
|
import org.springframework.batch.core.configuration.annotation.JobScope;
|
||||||
|
@ -40,11 +41,18 @@ public class BulkExportJobConfig {
|
||||||
@Bean
|
@Bean
|
||||||
public Job bulkExportJob() {
|
public Job bulkExportJob() {
|
||||||
return myJobBuilderFactory.get("bulkExportJob")
|
return myJobBuilderFactory.get("bulkExportJob")
|
||||||
|
.validator(jobExistsValidator())
|
||||||
.start(partitionStep())
|
.start(partitionStep())
|
||||||
.listener(bulkExportJobCompletionListener())
|
.listener(bulkExportJobCompletionListener())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public JobParametersValidator jobExistsValidator() {
|
||||||
|
return new JobExistsParameterValidator();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public Step bulkExportGenerateResourceFilesStep() {
|
public Step bulkExportGenerateResourceFilesStep() {
|
||||||
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
|
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
package ca.uhn.fhir.jpa.bulk.job;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
|
||||||
|
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.springframework.batch.core.JobParameters;
|
||||||
|
import org.springframework.batch.core.JobParametersInvalidException;
|
||||||
|
import org.springframework.batch.core.JobParametersValidator;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class will prevent job running if the UUID is found to be non-existent, or invalid.
|
||||||
|
*/
|
||||||
|
public class JobExistsParameterValidator implements JobParametersValidator {
|
||||||
|
@Autowired
|
||||||
|
private IBulkExportJobDao myBulkExportJobDao;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void validate(JobParameters theJobParameters) throws JobParametersInvalidException {
|
||||||
|
String jobUUID = theJobParameters.getString("jobUUID");
|
||||||
|
if (StringUtils.isBlank(jobUUID)) {
|
||||||
|
throw new JobParametersInvalidException("You did not pass a jobUUID to this job!");
|
||||||
|
}
|
||||||
|
|
||||||
|
Optional<BulkExportJobEntity> oJob = myBulkExportJobDao.findByJobId(jobUUID);
|
||||||
|
if (!oJob.isPresent()) {
|
||||||
|
throw new JobParametersInvalidException("There is no persisted job that exists with UUID: " + jobUUID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -49,6 +49,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.batch.core.JobParameters;
|
import org.springframework.batch.core.JobParameters;
|
||||||
import org.springframework.batch.core.JobParametersBuilder;
|
import org.springframework.batch.core.JobParametersBuilder;
|
||||||
|
import org.springframework.batch.core.JobParametersInvalidException;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.data.domain.PageRequest;
|
import org.springframework.data.domain.PageRequest;
|
||||||
|
@ -192,7 +193,11 @@ public class BulkDataExportSvcImpl implements IBulkDataExportSvc {
|
||||||
.addString("jobUUID", theJobUuid)
|
.addString("jobUUID", theJobUuid)
|
||||||
.toJobParameters();
|
.toJobParameters();
|
||||||
|
|
||||||
|
try {
|
||||||
myJobSubmitter.runJob(myBulkExportJob, parameters);
|
myJobSubmitter.runJob(myBulkExportJob, parameters);
|
||||||
|
} catch (JobParametersInvalidException theE) {
|
||||||
|
ourLog.error("Unable to start job with UUID: {}, the parameters are invalid. {}", theJobUuid, theE.getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.batch.core.Job;
|
import org.springframework.batch.core.Job;
|
||||||
import org.springframework.batch.core.JobParametersBuilder;
|
import org.springframework.batch.core.JobParametersBuilder;
|
||||||
|
import org.springframework.batch.core.JobParametersInvalidException;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
|
||||||
|
@ -251,7 +252,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBatchJobSubmitsAndRuns() throws InterruptedException {
|
public void testBatchJobSubmitsAndRuns() throws Exception {
|
||||||
createResources();
|
createResources();
|
||||||
|
|
||||||
// Create a bulk job
|
// Create a bulk job
|
||||||
|
@ -265,6 +266,18 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
|
||||||
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
|
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobParametersValidatorRejectsInvalidParameters() {
|
||||||
|
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!");
|
||||||
|
try {
|
||||||
|
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
|
||||||
|
fail("Should have had invalid parameter execption!");
|
||||||
|
} catch (JobParametersInvalidException e) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public IBulkDataExportSvc.JobInfo awaitJobCompletion(String theJobId) throws InterruptedException {
|
public IBulkDataExportSvc.JobInfo awaitJobCompletion(String theJobId) throws InterruptedException {
|
||||||
while(true) {
|
while(true) {
|
||||||
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(theJobId);
|
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(theJobId);
|
||||||
|
|
|
@ -3,8 +3,9 @@ package ca.uhn.fhir.jpa.batch.api;
|
||||||
import org.springframework.batch.core.Job;
|
import org.springframework.batch.core.Job;
|
||||||
import org.springframework.batch.core.JobExecution;
|
import org.springframework.batch.core.JobExecution;
|
||||||
import org.springframework.batch.core.JobParameters;
|
import org.springframework.batch.core.JobParameters;
|
||||||
|
import org.springframework.batch.core.JobParametersInvalidException;
|
||||||
|
|
||||||
public interface IBatchJobSubmitter {
|
public interface IBatchJobSubmitter {
|
||||||
|
|
||||||
JobExecution runJob(Job theJob, JobParameters theJobParameters);
|
JobExecution runJob(Job theJob, JobParameters theJobParameters) throws JobParametersInvalidException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package ca.uhn.fhir.jpa.batch.svc;
|
package ca.uhn.fhir.jpa.batch.svc;
|
||||||
|
|
||||||
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
|
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
|
||||||
|
import org.slf4j.Logger;
|
||||||
import org.springframework.batch.core.Job;
|
import org.springframework.batch.core.Job;
|
||||||
import org.springframework.batch.core.JobExecution;
|
import org.springframework.batch.core.JobExecution;
|
||||||
import org.springframework.batch.core.JobParameters;
|
import org.springframework.batch.core.JobParameters;
|
||||||
|
@ -8,31 +9,32 @@ import org.springframework.batch.core.JobParametersInvalidException;
|
||||||
import org.springframework.batch.core.launch.JobLauncher;
|
import org.springframework.batch.core.launch.JobLauncher;
|
||||||
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
|
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
|
||||||
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
|
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
|
||||||
|
import org.springframework.batch.core.repository.JobRepository;
|
||||||
import org.springframework.batch.core.repository.JobRestartException;
|
import org.springframework.batch.core.repository.JobRestartException;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.ApplicationContext;
|
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import static org.slf4j.LoggerFactory.getLogger;
|
||||||
|
|
||||||
public class BatchJobSubmitterImpl implements IBatchJobSubmitter {
|
public class BatchJobSubmitterImpl implements IBatchJobSubmitter {
|
||||||
|
|
||||||
|
private static final Logger ourLog = getLogger(BatchJobSubmitterImpl.class);
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private JobLauncher myJobLauncher;
|
private JobLauncher myJobLauncher;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private JobRepository myJobRepository;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public JobExecution runJob(Job theJob, JobParameters theJobParameters) {
|
public JobExecution runJob(Job theJob, JobParameters theJobParameters) throws JobParametersInvalidException{
|
||||||
try {
|
try {
|
||||||
return myJobLauncher.run(theJob, theJobParameters);
|
return myJobLauncher.run(theJob, theJobParameters);
|
||||||
} catch (JobExecutionAlreadyRunningException theE) {
|
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException theE) {
|
||||||
//FIXME properly handle these
|
ourLog.warn("Job {} was already running, ignoring the call to start.", theJob.getName());
|
||||||
theE.printStackTrace();
|
return myJobRepository.getLastJobExecution(theJob.getName(), theJobParameters);
|
||||||
} catch (JobRestartException theE) {
|
|
||||||
theE.printStackTrace();
|
|
||||||
} catch (JobInstanceAlreadyCompleteException theE) {
|
|
||||||
theE.printStackTrace();
|
|
||||||
} catch (JobParametersInvalidException theE) {
|
} catch (JobParametersInvalidException theE) {
|
||||||
theE.printStackTrace();
|
ourLog.error("Job Parameters passed to this job were invalid: {}", theE.getMessage());
|
||||||
}
|
throw theE;
|
||||||
return null;
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue