This commit is contained in:
Tadgh 2020-06-05 15:53:06 -07:00
parent 7ed65b0080
commit 4405e50db9
11 changed files with 94 additions and 123 deletions

View File

@ -34,7 +34,6 @@ import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
@ -43,6 +42,7 @@ import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;

View File

@ -32,10 +32,14 @@ public class BulkExportDaoSvc {
IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Transactional
public void addFileToCollection(BulkExportCollectionEntity theCollectionEntity, BulkExportCollectionFileEntity theFile) {
theCollectionEntity.getFiles().add(theFile);
myBulkExportCollectionDao.saveAndFlush(theCollectionEntity);
myBulkExportCollectionFileDao.saveAndFlush(theFile);
public void addFileToCollectionWithId(Long theCollectionEntityId, BulkExportCollectionFileEntity theFile) {
Optional<BulkExportCollectionEntity> byId = myBulkExportCollectionDao.findById(theCollectionEntityId);
if (byId.isPresent()) {
BulkExportCollectionEntity exportCollectionEntity = byId.get();
theFile.setCollection(exportCollectionEntity);;
myBulkExportCollectionFileDao.saveAndFlush(theFile);
myBulkExportCollectionDao.saveAndFlush(exportCollectionEntity);
}
}

View File

@ -0,0 +1,39 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.jpa.bulk.BulkJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.Optional;
public class BulkExportJobCompletionListener implements JobExecutionListener {
@Value("#{jobParameters['jobUUID']}")
private String myJobUUID;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@Override
public void beforeJob(JobExecution theJobExecution) {
}
@Override
public void afterJob(JobExecution theJobExecution) {
if (theJobExecution.getStatus() == BatchStatus.COMPLETED) {
Optional<BulkExportJobEntity> byJobId = myBulkExportJobDao.findByJobId(myJobUUID);
if (byJobId.isPresent()) {
BulkExportJobEntity bulkExportJobEntity = byJobId.get();
bulkExportJobEntity.setStatus(BulkJobStatusEnum.COMPLETE);
myBulkExportJobDao.save(bulkExportJobEntity);
}
}
}
}

View File

@ -16,9 +16,6 @@ import ca.uhn.fhir.rest.param.DateRangeParam;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -31,7 +28,6 @@ import java.util.Optional;
public class BulkItemReader extends AbstractItemCountingItemStreamItemReader<ResourcePersistentId> {
private static final Logger ourLog = LoggerFactory.getLogger(BulkItemReader.class);
@Autowired
private IBulkExportJobDao myBulkExportJobDao;

View File

@ -41,10 +41,10 @@ public class BulkItemResourceLoaderProcessor implements ItemProcessor<ResourcePe
IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType);
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, resourceTypeClass);
List<IBaseResource> outgoing = new ArrayList<>();
sb.loadResourcesByPid(Collections.singletonList(theResourcePersistentId), Collections.emptyList(), outgoing, false, null);
//Update bytes taken so far.
return outgoing.get(0);
}

View File

@ -1,75 +0,0 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.apache.commons.io.FileUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import static org.slf4j.LoggerFactory.getLogger;
public class OutputFileSizeCompletionPolicy implements CompletionPolicy, ItemProcessListener<ResourcePersistentId, IBaseResource> {
private static final Logger ourLog = getLogger(OutputFileSizeCompletionPolicy.class);
private long myTotalBytesSoFar;
private long myFileMaxChars = 500 * FileUtils.ONE_KB;
@Autowired
private FhirContext myFhirContext;
private IParser myParser;
@PostConstruct
public void start() {
myParser = myFhirContext.newJsonParser().setPrettyPrint(false);
}
@Override
public void beforeProcess(ResourcePersistentId theResourcePersistentId) {
ourLog.warn("Calling beforeProcess on Lisener");
}
@Override
public void afterProcess(ResourcePersistentId theResourcePersistentId, IBaseResource theIBaseResource) {
ourLog.warn("Calling afterProcess on Listener");
myTotalBytesSoFar += myParser.encodeResourceToString(theIBaseResource).getBytes().length;
ourLog.warn("total bytes in afterProcess: " + myTotalBytesSoFar);
}
@Override
public void onProcessError(ResourcePersistentId theResourcePersistentId, Exception theE) {
}
@Override
public boolean isComplete(RepeatContext theRepeatContext, RepeatStatus theRepeatStatus) {
return RepeatStatus.FINISHED == theRepeatStatus || isComplete(theRepeatContext);
}
@Override
public boolean isComplete(RepeatContext theRepeatContext) {
ourLog.warn("TOTAL BYTES SO FAR: " + myTotalBytesSoFar);
return myTotalBytesSoFar >= myFileMaxChars;
}
@Override
public RepeatContext start(RepeatContext theRepeatContext) {
ourLog.warn("Calling start on CompletionPolicy");
myTotalBytesSoFar = 0l;
return theRepeatContext;
}
@Override
public void update(RepeatContext theRepeatContext) {
ourLog.warn("Calling update on CompletionPolicy");
}
}

View File

@ -4,7 +4,6 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.parser.IParser;
@ -37,16 +36,12 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
@Autowired
private FhirContext myContext;
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Autowired
private IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private BulkExportCollectionFileDaoSvc myBulkExportCollectionFileDaoSvc;
private BulkExportCollectionEntity myBulkExportCollectionEntity;
@ -62,7 +57,6 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
public ResourceToFileWriter() {
myOutputStream = new ByteArrayOutputStream();
myWriter = new OutputStreamWriter(myOutputStream, Constants.CHARSET_UTF8);
@ -74,19 +68,13 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
myBinaryDao = getBinaryDao();
}
private Optional<IIdType> flushToFiles(BulkExportCollectionEntity theExportCollectionEntity) {
private Optional<IIdType> flushToFiles() {
if (myOutputStream.size() > 0) {
IBaseBinary binary = BinaryUtil.newBinary(myContext);
binary.setContentType(Constants.CT_FHIR_NDJSON);
binary.setContent(myOutputStream.toByteArray());
IIdType createdId = myBinaryDao.create(binary).getResource().getIdElement();
IIdType createdId = createBinaryFromOutputStream();
BulkExportCollectionFileEntity file = new BulkExportCollectionFileEntity();
file.setCollection(theExportCollectionEntity);
file.setResource(createdId.getIdPart());
myBulkExportDaoSvc.addFileToCollection(theExportCollectionEntity, file);
myBulkExportDaoSvc.addFileToCollectionWithId(myBulkExportCollectionEntityId, file);
myOutputStream.reset();
@ -96,6 +84,14 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
return Optional.empty();
}
private IIdType createBinaryFromOutputStream() {
IBaseBinary binary = BinaryUtil.newBinary(myContext);
binary.setContentType(Constants.CT_FHIR_NDJSON);
binary.setContent(myOutputStream.toByteArray());
return myBinaryDao.create(binary).getResource().getIdElement();
}
private BulkExportCollectionEntity getOrLoadBulkExportCollectionEntity() {
if (myBulkExportCollectionEntity == null) {
Optional<BulkExportCollectionEntity> oBulkExportCollectionEntity = myBulkExportCollectionDao.findById(myBulkExportCollectionEntityId);
@ -117,8 +113,7 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
myWriter.append("\n");
}
BulkExportCollectionEntity exportCollectionEntity = getOrLoadBulkExportCollectionEntity();
Optional<IIdType> createdId = flushToFiles(exportCollectionEntity);
Optional<IIdType> createdId = flushToFiles();
createdId.ifPresent(theIIdType -> ourLog.warn("Created resources for bulk export file containing {}:{} resources of type ", theIIdType.toUnqualifiedVersionless().getValue(), myBulkExportCollectionEntity.getResourceType()));
}

View File

@ -27,18 +27,22 @@ public class ResourceTypePartitioner implements Partitioner {
Map<String, ExecutionContext> partitionContextMap = new HashMap<>();
Map<Long, String> idToResourceType = myBulkExportDaoSvc.getBulkJobCollectionIdToResourceTypeMap( myJobUUID);
//observation -> obs1.json, obs2.json, obs3.json BulkJobCollectionEntity
//bulk Collection Entity ID -> patient
// 123123-> Patient
// 91876389126-> Observation
idToResourceType.entrySet().stream()
.forEach(entry -> {
ExecutionContext context = new ExecutionContext();
String resourceType = entry.getValue();
Long collectionEntityId = entry.getKey();
ourLog.debug("Creating a partition step for CollectionEntity: [{}] processing resource type [{}]", collectionEntityId, resourceType);
//The slave step needs to know what resource type it is looking for.
ExecutionContext context = new ExecutionContext();
//The worker step needs to know what resource type it is looking for.
context.putString("resourceType", resourceType);
// The slave step needs to know which parent job it is processing for, and which collection entity it will be
// The worker step needs to know which parent job it is processing for, and which collection entity it will be
// attaching its results to.
context.putString("jobUUID", myJobUUID);
context.putLong("bulkExportCollectionEntityId", collectionEntityId);
@ -47,6 +51,7 @@ public class ResourceTypePartitioner implements Partitioner {
partitionContextMap.put(resourceType, context);
});
return partitionContextMap;
}

View File

@ -34,9 +34,11 @@ public class BulkExportCollectionFileEntity implements Serializable {
@SequenceGenerator(name = "SEQ_BLKEXCOLFILE_PID", sequenceName = "SEQ_BLKEXCOLFILE_PID")
@Column(name = "PID")
private Long myId;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "COLLECTION_PID", referencedColumnName = "PID", nullable = false, foreignKey = @ForeignKey(name="FK_BLKEXCOLFILE_COLLECT"))
private BulkExportCollectionEntity myCollection;
@Column(name = "RES_ID", length = ForcedId.MAX_FORCED_ID_LENGTH, nullable = false)
private String myResourceId;

View File

@ -320,10 +320,10 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless();
// Observation obs = new Observation();
// obs.setId("OBS" + i);
// obs.setStatus(Observation.ObservationStatus.FINAL);
// obs.getSubject().setReference(patId.getValue());
//Observation obs = new Observation();
//obs.setId("OBS" + i);
//obs.setStatus(Observation.ObservationStatus.FINAL);
//obs.getSubject().setReference(patId.getValue());
// myObservationDao.update(obs);
}
}

View File

@ -5,9 +5,9 @@ import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.binstore.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
import ca.uhn.fhir.jpa.bulk.batch.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.bulk.batch.BulkExportJobCompletionListener;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemResourceLoaderProcessor;
import ca.uhn.fhir.jpa.bulk.batch.OutputFileSizeCompletionPolicy;
import ca.uhn.fhir.jpa.bulk.batch.ResourceToFileWriter;
import ca.uhn.fhir.jpa.bulk.batch.ResourceTypePartitioner;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
@ -52,6 +52,11 @@ import static org.junit.Assert.fail;
@EnableBatchProcessing
public class TestR4Config extends BaseJavaConfigR4 {
/**
* NANI
*/
public static final String WILL_LATE_BIND = null;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TestR4Config.class);
public static Integer ourMaxThreads;
@ -75,12 +80,6 @@ public class TestR4Config extends BaseJavaConfigR4 {
private Exception myLastStackTrace;
@Bean
@StepScope
public OutputFileSizeCompletionPolicy filesizeCompletionPolicy() {
return new OutputFileSizeCompletionPolicy();
}
@Bean
public IBatchJobSubmitter batchJobSubmitter() {
return new BatchJobSubmitterImpl();
@ -95,31 +94,37 @@ public class TestR4Config extends BaseJavaConfigR4 {
public Job bulkExportJob() {
return myJobBuilderFactory.get("bulkExportJob")
.start(partitionStep())
.listener(bulkExportJobCompletionListener())
.build();
}
@Bean
public Step slaveResourceStep() {
return myStepBuilderFactory.get("slaveResourceStep")
.<ResourcePersistentId, IBaseResource> chunk(filesizeCompletionPolicy())
.reader(myBulkItemReader(null))
public Step workerResourceStep() {
return myStepBuilderFactory.get("workerResourceStep")
.<ResourcePersistentId, IBaseResource> chunk(2)
.reader(myBulkItemReader(WILL_LATE_BIND))
.processor(pidToResourceProcessor())
.writer(resourceToFileWriter())
.listener(filesizeCompletionPolicy())
.build();
}
@Bean
@JobScope
public BulkExportJobCompletionListener bulkExportJobCompletionListener() {
return new BulkExportJobCompletionListener();
}
@Bean
@StepScope
public ItemWriter<IBaseResource> resourceToFileWriter() {
return new ResourceToFileWriter();
}
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
.partitioner("slaveResourceStep", partitioner(null)).step(slaveResourceStep())
.partitioner("workerResourceStep", partitioner(null))
.step(workerResourceStep())
.build();
}