partially done writer, working out size-based chunking

This commit is contained in:
Tadgh 2020-06-04 20:58:06 -07:00
parent 7d46b3cc10
commit 7ed65b0080
8 changed files with 334 additions and 46 deletions

View File

@ -0,0 +1,25 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import static org.slf4j.LoggerFactory.getLogger;
@Service
public class BulkExportCollectionFileDaoSvc {
private static final Logger ourLog = getLogger(BulkExportCollectionFileDaoSvc.class);
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Transactional
public void save(BulkExportCollectionFileEntity theBulkExportCollectionEntity) {
myBulkExportCollectionFileDao.saveAndFlush(theBulkExportCollectionEntity);
}
}

View File

@ -1,14 +1,18 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@ -21,13 +25,29 @@ public class BulkExportDaoSvc {
@Autowired
IBulkExportJobDao myBulkExportJobDao;
@Autowired
IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Transactional
public List<String> getBulkJobResourceTypes(String theJobUUID) {
public void addFileToCollection(BulkExportCollectionEntity theCollectionEntity, BulkExportCollectionFileEntity theFile) {
theCollectionEntity.getFiles().add(theFile);
myBulkExportCollectionDao.saveAndFlush(theCollectionEntity);
myBulkExportCollectionFileDao.saveAndFlush(theFile);
}
@Transactional
public Map<Long, String> getBulkJobCollectionIdToResourceTypeMap(String theJobUUID) {
BulkExportJobEntity bulkExportJobEntity = loadJob(theJobUUID);
return bulkExportJobEntity.getCollections()
.stream()
.map(BulkExportCollectionEntity::getResourceType)
.collect(Collectors.toList());
Collection<BulkExportCollectionEntity> collections = bulkExportJobEntity.getCollections();
return collections.stream()
.collect(Collectors.toMap(
BulkExportCollectionEntity::getId,
BulkExportCollectionEntity::getResourceType
));
}
private BulkExportJobEntity loadJob(String theJobUUID) {

View File

@ -7,6 +7,7 @@ import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -15,7 +16,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.slf4j.LoggerFactory.getLogger;
public class BulkItemResourceLoaderProcessor implements ItemProcessor<ResourcePersistentId, IBaseResource> {
private static final Logger ourLog = getLogger(BulkItemResourceLoaderProcessor.class);
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@ -29,6 +34,8 @@ public class BulkItemResourceLoaderProcessor implements ItemProcessor<ResourcePe
@Autowired
private FhirContext myContext;
@Override
public IBaseResource process(ResourcePersistentId theResourcePersistentId) throws Exception {
@ -37,6 +44,9 @@ public class BulkItemResourceLoaderProcessor implements ItemProcessor<ResourcePe
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, resourceTypeClass);
List<IBaseResource> outgoing = new ArrayList<>();
sb.loadResourcesByPid(Collections.singletonList(theResourcePersistentId), Collections.emptyList(), outgoing, false, null);
//Update bytes taken so far.
return outgoing.get(0);
}
}

View File

@ -0,0 +1,75 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.apache.commons.io.FileUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import static org.slf4j.LoggerFactory.getLogger;
public class OutputFileSizeCompletionPolicy implements CompletionPolicy, ItemProcessListener<ResourcePersistentId, IBaseResource> {
private static final Logger ourLog = getLogger(OutputFileSizeCompletionPolicy.class);
private long myTotalBytesSoFar;
private long myFileMaxChars = 500 * FileUtils.ONE_KB;
@Autowired
private FhirContext myFhirContext;
private IParser myParser;
@PostConstruct
public void start() {
myParser = myFhirContext.newJsonParser().setPrettyPrint(false);
}
@Override
public void beforeProcess(ResourcePersistentId theResourcePersistentId) {
ourLog.warn("Calling beforeProcess on Lisener");
}
@Override
public void afterProcess(ResourcePersistentId theResourcePersistentId, IBaseResource theIBaseResource) {
ourLog.warn("Calling afterProcess on Listener");
myTotalBytesSoFar += myParser.encodeResourceToString(theIBaseResource).getBytes().length;
ourLog.warn("total bytes in afterProcess: " + myTotalBytesSoFar);
}
@Override
public void onProcessError(ResourcePersistentId theResourcePersistentId, Exception theE) {
}
@Override
public boolean isComplete(RepeatContext theRepeatContext, RepeatStatus theRepeatStatus) {
return RepeatStatus.FINISHED == theRepeatStatus || isComplete(theRepeatContext);
}
@Override
public boolean isComplete(RepeatContext theRepeatContext) {
ourLog.warn("TOTAL BYTES SO FAR: " + myTotalBytesSoFar);
return myTotalBytesSoFar >= myFileMaxChars;
}
@Override
public RepeatContext start(RepeatContext theRepeatContext) {
ourLog.warn("Calling start on CompletionPolicy");
myTotalBytesSoFar = 0l;
return theRepeatContext;
}
@Override
public void update(RepeatContext theRepeatContext) {
ourLog.warn("Calling update on CompletionPolicy");
}
}

View File

@ -0,0 +1,149 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.util.BinaryUtil;
import org.hl7.fhir.instance.model.api.IBaseBinary;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.PostConstruct;
import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.util.List;
import java.util.Optional;
import static org.slf4j.LoggerFactory.getLogger;
public class ResourceToFileWriter implements ItemWriter<IBaseResource>, CompletionPolicy {
private static final Logger ourLog = getLogger(ResourceToFileWriter.class);
@Autowired
private FhirContext myContext;
@Autowired
private IBulkExportCollectionFileDao myBulkExportCollectionFileDao;
@Autowired
private IBulkExportCollectionDao myBulkExportCollectionDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private BulkExportCollectionFileDaoSvc myBulkExportCollectionFileDaoSvc;
private BulkExportCollectionEntity myBulkExportCollectionEntity;
private ByteArrayOutputStream myOutputStream;
private OutputStreamWriter myWriter;
private IParser myParser;
@Value("#{stepExecutionContext['bulkExportCollectionEntityId']}")
private Long myBulkExportCollectionEntityId;
private IFhirResourceDao<IBaseBinary> myBinaryDao;
@Autowired
private BulkExportDaoSvc myBulkExportDaoSvc;
public ResourceToFileWriter() {
myOutputStream = new ByteArrayOutputStream();
myWriter = new OutputStreamWriter(myOutputStream, Constants.CHARSET_UTF8);
}
@PostConstruct
public void start() {
myParser = myContext.newJsonParser().setPrettyPrint(false);
myBinaryDao = getBinaryDao();
}
private Optional<IIdType> flushToFiles(BulkExportCollectionEntity theExportCollectionEntity) {
if (myOutputStream.size() > 0) {
IBaseBinary binary = BinaryUtil.newBinary(myContext);
binary.setContentType(Constants.CT_FHIR_NDJSON);
binary.setContent(myOutputStream.toByteArray());
IIdType createdId = myBinaryDao.create(binary).getResource().getIdElement();
BulkExportCollectionFileEntity file = new BulkExportCollectionFileEntity();
file.setCollection(theExportCollectionEntity);
file.setResource(createdId.getIdPart());
myBulkExportDaoSvc.addFileToCollection(theExportCollectionEntity, file);
myOutputStream.reset();
return Optional.of(createdId);
}
return Optional.empty();
}
private BulkExportCollectionEntity getOrLoadBulkExportCollectionEntity() {
if (myBulkExportCollectionEntity == null) {
Optional<BulkExportCollectionEntity> oBulkExportCollectionEntity = myBulkExportCollectionDao.findById(myBulkExportCollectionEntityId);
if (!oBulkExportCollectionEntity.isPresent()) {
throw new IllegalArgumentException("This BulkExportCollectionEntity doesn't exist!");
} else {
myBulkExportCollectionEntity = oBulkExportCollectionEntity.get();
}
}
return myBulkExportCollectionEntity;
}
@Override
public void write(List<? extends IBaseResource> resources) throws Exception {
for (IBaseResource nextFileResource : resources) {
myParser.encodeResourceToWriter(nextFileResource, myWriter);
myWriter.append("\n");
}
BulkExportCollectionEntity exportCollectionEntity = getOrLoadBulkExportCollectionEntity();
Optional<IIdType> createdId = flushToFiles(exportCollectionEntity);
createdId.ifPresent(theIIdType -> ourLog.warn("Created resources for bulk export file containing {}:{} resources of type ", theIIdType.toUnqualifiedVersionless().getValue(), myBulkExportCollectionEntity.getResourceType()));
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
return myDaoRegistry.getResourceDao("Binary");
}
@Override
public boolean isComplete(RepeatContext theRepeatContext, RepeatStatus theRepeatStatus) {
return false;
}
@Override
public boolean isComplete(RepeatContext theRepeatContext) {
return false;
}
@Override
public RepeatContext start(RepeatContext theRepeatContext) {
return null;
}
@Override
public void update(RepeatContext theRepeatContext) {
}
}

View File

@ -1,21 +1,11 @@
package ca.uhn.fhir.jpa.bulk.batch;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import org.slf4j.Logger;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Resource;
import javax.transaction.TransactionSynchronizationRegistry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.slf4j.LoggerFactory.getLogger;
@ -36,13 +26,24 @@ public class ResourceTypePartitioner implements Partitioner {
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitionContextMap = new HashMap<>();
List<String> resourceTypes = myBulkExportDaoSvc.getBulkJobResourceTypes(myJobUUID);
Map<Long, String> idToResourceType = myBulkExportDaoSvc.getBulkJobCollectionIdToResourceTypeMap( myJobUUID);
resourceTypes.stream()
.forEach(resourceType -> {
idToResourceType.entrySet().stream()
.forEach(entry -> {
ExecutionContext context = new ExecutionContext();
String resourceType = entry.getValue();
Long collectionEntityId = entry.getKey();
ourLog.debug("Creating a partition step for CollectionEntity: [{}] processing resource type [{}]", collectionEntityId, resourceType);
//The slave step needs to know what resource type it is looking for.
context.putString("resourceType", resourceType);
// The slave step needs to know which parent job it is processing for, and which collection entity it will be
// attaching its results to.
context.putString("jobUUID", myJobUUID);
context.putLong("bulkExportCollectionEntityId", collectionEntityId);
// Name the partition based on the resource type
partitionContextMap.put(resourceType, context);
});

View File

@ -17,27 +17,27 @@ import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
import org.hl7.fhir.r4.model.InstantType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.Date;
import java.util.UUID;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
@ -258,6 +258,9 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
JobParametersBuilder paramBuilder = new JobParametersBuilder().addString("jobUUID", jobDetails.getJobId());
myBatchJobSubmitter.runJob(myBulkJob, paramBuilder.toJobParameters());
IBulkDataExportSvc.JobInfo jobStatusOrThrowResourceNotFound = myBulkDataExportSvc.getJobStatusOrThrowResourceNotFound(jobDetails.getJobId());
assertThat(jobStatusOrThrowResourceNotFound.getStatus(), equalTo(BulkJobStatusEnum.COMPLETE));
}
@Test
public void testSubmit_WithSince() throws InterruptedException {
@ -317,11 +320,11 @@ public class BulkDataExportSvcImplR4Test extends BaseJpaR4Test {
patient.addIdentifier().setSystem("http://mrns").setValue("PAT" + i);
IIdType patId = myPatientDao.update(patient).getId().toUnqualifiedVersionless();
Observation obs = new Observation();
obs.setId("OBS" + i);
obs.setStatus(Observation.ObservationStatus.FINAL);
obs.getSubject().setReference(patId.getValue());
myObservationDao.update(obs);
// Observation obs = new Observation();
// obs.setId("OBS" + i);
// obs.setStatus(Observation.ObservationStatus.FINAL);
// obs.getSubject().setReference(patId.getValue());
// myObservationDao.update(obs);
}
}
}

View File

@ -7,6 +7,8 @@ import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
import ca.uhn.fhir.jpa.bulk.batch.BulkExportDaoSvc;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemReader;
import ca.uhn.fhir.jpa.bulk.batch.BulkItemResourceLoaderProcessor;
import ca.uhn.fhir.jpa.bulk.batch.OutputFileSizeCompletionPolicy;
import ca.uhn.fhir.jpa.bulk.batch.ResourceToFileWriter;
import ca.uhn.fhir.jpa.bulk.batch.ResourceTypePartitioner;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
@ -28,14 +30,12 @@ import org.springframework.batch.core.configuration.annotation.StepBuilderFactor
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.env.Environment;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@ -74,6 +74,13 @@ public class TestR4Config extends BaseJavaConfigR4 {
private Exception myLastStackTrace;
@Bean
@StepScope
public OutputFileSizeCompletionPolicy filesizeCompletionPolicy() {
return new OutputFileSizeCompletionPolicy();
}
@Bean
public IBatchJobSubmitter batchJobSubmitter() {
return new BatchJobSubmitterImpl();
@ -94,13 +101,21 @@ public class TestR4Config extends BaseJavaConfigR4 {
@Bean
public Step slaveResourceStep() {
return myStepBuilderFactory.get("slaveResourceStep")
.<ResourcePersistentId, IBaseResource> chunk(2)
.<ResourcePersistentId, IBaseResource> chunk(filesizeCompletionPolicy())
.reader(myBulkItemReader(null))
.processor(pidToResourceProcessor(null))
.writer(mySimplePrinter())
.listener(myBulkItemReader(null))
.processor(pidToResourceProcessor())
.writer(resourceToFileWriter())
.listener(filesizeCompletionPolicy())
.build();
}
@Bean
@StepScope
public ItemWriter<IBaseResource> resourceToFileWriter() {
return new ResourceToFileWriter();
}
@Bean
public Step partitionStep() {
return myStepBuilderFactory.get("partitionStep")
@ -123,20 +138,10 @@ public class TestR4Config extends BaseJavaConfigR4 {
@Bean
@StepScope
public ItemProcessor<ResourcePersistentId, IBaseResource> pidToResourceProcessor(@Value("#{jobParameters['jobUUID']}") String theUUID) {
public ItemProcessor<ResourcePersistentId, IBaseResource> pidToResourceProcessor() {
return new BulkItemResourceLoaderProcessor();
}
@Bean
public ItemWriter<IBaseResource> mySimplePrinter() {
return (theResourcePersistentIds) -> {
System.out.println("PRINTING CHUNK");
theResourcePersistentIds.stream().forEach(theIBaseResource-> {
System.out.println("zoop -> " + theIBaseResource);
});
};
}
@Bean
@StepScope
public BulkItemReader myBulkItemReader(@Value("#{jobParameters['jobUUID']}") String theJobUUID) {