This commit is contained in:
Tadgh 2021-02-22 10:25:24 -05:00
parent 1e0981b416
commit d31b04ee65
4 changed files with 53 additions and 5 deletions

View File

@ -24,6 +24,7 @@ import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
@ -34,10 +35,14 @@ import org.springframework.transaction.support.TransactionTemplate;
import java.util.Arrays;
import java.util.Optional;
import static org.slf4j.LoggerFactory.getLogger;
/**
* This class will prevent a job from running if the UUID does not exist or is invalid.
*/
public class BulkExportJobParameterValidator implements JobParametersValidator {
private static final Logger ourLog = getLogger(BulkExportJobParameterValidator.class);
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
@ -82,8 +87,10 @@ public class BulkExportJobParameterValidator implements JobParametersValidator {
if (!StringUtils.isBlank(outputFormat) && !Constants.CT_FHIR_NDJSON.equals(outputFormat)) {
errorBuilder.append("The only allowed format for Bulk Export is currently " + Constants.CT_FHIR_NDJSON);
}
}
if (theJobParameters.getString("groupId") != null) {
ourLog.debug("detected we are running in group mode with group id [{}]", theJobParameters.getString("groupId"));
}
return errorBuilder.toString();
});

View File

@ -35,6 +35,7 @@ import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.util.UrlUtil;
@ -56,6 +57,13 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
Iterator<ResourcePersistentId> myPidIterator;
@Value("#{jobParameters['readChunkSize']}")
private Long READ_CHUNK_SIZE;
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Value("#{jobParameters['groupId']}")
private String myGroupId;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Autowired
@ -64,10 +72,7 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private FhirContext myContext;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Value("#{stepExecutionContext['resourceType']}")
private String myResourceType;
@Autowired
private MatchUrlService myMatchUrlService;
@ -89,6 +94,8 @@ public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
SearchParameterMap map = createSearchParameterMapFromTypeFilter(jobEntity, def);
if (myGroupId != null) {
}
if (jobEntity.getSince() != null) {
map.setLastUpdated(new DateRangeParam(jobEntity.getSince(), null));

View File

@ -39,6 +39,9 @@ public class ResourceTypePartitioner implements Partitioner {
@Value("#{jobExecutionContext['jobUUID']}")
private String myJobUUID;
@Value("#{jobExecutionContext['groupId']}")
private String myGroupId;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.bulk.api.BulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.GroupBulkDataExportOptions;
import ca.uhn.fhir.jpa.bulk.api.IBulkDataExportSvc;
import ca.uhn.fhir.jpa.bulk.job.BulkExportJobParametersBuilder;
import ca.uhn.fhir.jpa.bulk.model.BulkJobStatusEnum;
@ -24,9 +25,11 @@ import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.Group;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -79,6 +82,8 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@Qualifier("bulkExportJob")
private Job myBulkJob;
private String myPatientGroupIp;
@Test
public void testPurgeExpiredJobs() {
@ -497,6 +502,28 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
@Test
public void testGroupBatchJobWorks() throws Exception {
createResources();
// Create a bulk job
IBulkDataExportSvc.JobInfo jobDetails = myBulkDataExportSvc.submitJob(new GroupBulkDataExportOptions(null, Sets.newHashSet("Patient", "Observation"), null, null, myPatientGroupIp));
//Add the UUID to the job
BulkExportJobParametersBuilder paramBuilder = new BulkExportJobParametersBuilder()
.setJobUUID(jobDetails.getJobId())
.setReadChunkSize(10L);
JobExecution jobExecution = myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
awaitJobCompletion(jobExecution);
IBulkDataExportSvc.JobInfo jobInfo = myBulkDataExportSvc.getJobInfoOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobInfo.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
assertThat(jobInfo.getFiles().size(), equalTo(2));
}
@Test
public void testJobParametersValidatorRejectsInvalidParameters() {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", "I'm not real!");
@ -518,6 +545,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
}
private void createResources() {
Group group = new Group();
for (int i = 0; i < 10; i++) {
Patient patient = new Patient();
patient.setId("PAT" + i);
@ -525,6 +553,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
patient.addName().setFamily("FAM" + i);
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless();
group.addMember().setEntity(new Reference(patId));
Observation obs = new Observation();
obs.setId("OBS" + i);
@ -533,5 +562,7 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
obs.getSubject().setReference(patId.getValue());
myObservationDao.update(obs);
}
myPatientGroupIp = myGroupDao.create(group).getId().getValue();
}
}