Rework to read and process larger chunks

This commit is contained in:
Tadgh 2020-06-09 09:55:10 -07:00
parent 9c153527dd
commit 7273457ea7
4 changed files with 33 additions and 43 deletions

View File

@ -16,9 +16,9 @@ import java.util.Collections;
import java.util.List;
/**
* Reusable Item Processor which converts a ResourcePersistentId to its IBaseResource
* Reusable Item Processor which converts ResourcePersistentIds to their IBaseResources
*/
public class PidToIBaseResourceProcessor implements ItemProcessor<ResourcePersistentId, IBaseResource> {
public class PidToIBaseResourceProcessor implements ItemProcessor<List<ResourcePersistentId>, List<IBaseResource>> {
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@ -33,15 +33,15 @@ public class PidToIBaseResourceProcessor implements ItemProcessor<ResourcePersis
private FhirContext myContext;
@Override
public IBaseResource process(ResourcePersistentId theResourcePersistentId) throws Exception {
public List<IBaseResource> process(List<ResourcePersistentId> theResourcePersistentId) throws Exception {
IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType);
Class<? extends IBaseResource> resourceTypeClass = myContext.getResourceDefinition(myResourceType).getImplementingClass();
ISearchBuilder sb = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, resourceTypeClass);
List<IBaseResource> outgoing = new ArrayList<>();
sb.loadResourcesByPid(Collections.singletonList(theResourcePersistentId), Collections.emptyList(), outgoing, false, null);
return outgoing.get(0);
sb.loadResourcesByPid(theResourcePersistentId, Collections.emptyList(), outgoing, false, null);
return outgoing;
}

View File

@ -16,6 +16,8 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import java.util.List;
/**
* Spring batch Job configuration file. Contains all necessary plumbing to run a
* Bulk Export job.
@ -23,6 +25,8 @@ import org.springframework.core.task.TaskExecutor;
@Configuration
public class BulkExportJobConfig {
public static final String JOB_UUID_KEY = "jobUUID";
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@ -46,7 +50,7 @@ public class BulkExportJobConfig {
@Bean
public Step bulkExportGenerateResourceFilesStep() {
return myStepBuilderFactory.get("bulkExportGenerateResourceFilesStep")
.<ResourcePersistentId, IBaseResource> chunk(1000) //1000 resources per generated file
.<List<ResourcePersistentId>, List<IBaseResource>> chunk(100) //1000 resources per generated file, as the reader returns 10 resources at a time.
//TODO should we potentially make this configurable?
.reader(bulkItemReader(null))
.processor(myPidToIBaseResourceProcessor)
@ -85,7 +89,7 @@ public class BulkExportJobConfig {
@Bean
@StepScope
public ItemWriter<IBaseResource> resourceToFileWriter() {
public ItemWriter<List<IBaseResource>> resourceToFileWriter() {
return new ResourceToFileWriter();
}

View File

@ -28,9 +28,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
public class BulkItemReader implements ItemReader<ResourcePersistentId> {
public class BulkItemReader implements ItemReader<List<ResourcePersistentId>> {
private static final Logger ourLog = LoggerFactory.getLogger(BulkItemReader.class);
private static final int READ_CHUNK_SIZE = 10;
@Autowired
private IBulkExportJobDao myBulkExportJobDao;
@ -87,14 +89,18 @@ public class BulkItemReader implements ItemReader<ResourcePersistentId> {
}
@Override
public ResourcePersistentId read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
public List<ResourcePersistentId> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (myPidIterator == null) {
loadResourcePids();
}
if (myPidIterator.hasNext()) {
return myPidIterator.next();
} else {
return null;
int count = 0;
List<ResourcePersistentId> outgoing = new ArrayList<>();
while (myPidIterator.hasNext() && count < READ_CHUNK_SIZE) {
outgoing.add(myPidIterator.next());
count += 1;
}
return outgoing.size() == 0 ? null : outgoing;
}
}

View File

@ -13,9 +13,6 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -27,7 +24,7 @@ import java.util.Optional;
import static org.slf4j.LoggerFactory.getLogger;
public class ResourceToFileWriter implements ItemWriter<IBaseResource>, CompletionPolicy {
public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
private static final Logger ourLog = getLogger(ResourceToFileWriter.class);
@Autowired
@ -84,17 +81,6 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
return myBinaryDao.create(binary).getResource().getIdElement();
}
@Override
public void write(List<? extends IBaseResource> resources) throws Exception {
for (IBaseResource nextFileResource : resources) {
myParser.encodeResourceToWriter(nextFileResource, myWriter);
myWriter.append("\n");
}
Optional<IIdType> createdId = flushToFiles();
createdId.ifPresent(theIIdType -> ourLog.warn("Created resources for bulk export file containing {} resources of type ", theIIdType.toUnqualifiedVersionless().getValue()));
}
@SuppressWarnings("unchecked")
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
@ -102,22 +88,16 @@ public class ResourceToFileWriter implements ItemWriter<IBaseResource>, Completi
}
@Override
public boolean isComplete(RepeatContext theRepeatContext, RepeatStatus theRepeatStatus) {
return false;
}
public void write(List<? extends List<IBaseResource>> theList) throws Exception {
@Override
public boolean isComplete(RepeatContext theRepeatContext) {
return false;
}
@Override
public RepeatContext start(RepeatContext theRepeatContext) {
return null;
}
@Override
public void update(RepeatContext theRepeatContext) {
for (List<IBaseResource> resourceList : theList) {
for (IBaseResource nextFileResource : resourceList) {
myParser.encodeResourceToWriter(nextFileResource, myWriter);
myWriter.append("\n");
}
}
Optional<IIdType> createdId = flushToFiles();
createdId.ifPresent(theIIdType -> ourLog.warn("Created resources for bulk export file containing {} resources of type ", theIIdType.toUnqualifiedVersionless().getValue()));
}
}