From 34010fa78d76b59dd1e3c75368a6cc8350bd7929 Mon Sep 17 00:00:00 2001 From: StevenXLi Date: Tue, 19 Oct 2021 09:07:05 -0400 Subject: [PATCH] Batch job refactor (#3087) * added AbstractResourceToFileWriter * Added AbstractBaseBulkItemReader * added bulkExportStatusEnum fromBatchStatus test * Code review name change Co-authored-by: Long Ma Co-authored-by: Steven Li --- .../export/job/BaseJpaBulkItemReader.java | 110 ++++++++++++++++++ .../bulk/export/job/BulkExportJobConfig.java | 10 +- .../jpa/bulk/export/job/BulkItemReader.java | 3 +- .../bulk/export/job/GroupBulkItemReader.java | 2 +- .../export/job/PatientBulkItemReader.java | 2 +- .../bulk/export/job/ResourceToFileWriter.java | 72 +----------- .../bulk/export/job/BaseBulkItemReader.java | 97 +++------------ .../export/job/BaseResourceToFileWriter.java | 101 ++++++++++++++++ .../export/model/BulkExportJobStatusEnum.java | 16 ++- .../bulk/model/BulkExportStatusEnumTest.java | 21 ++++ 10 files changed, 282 insertions(+), 152 deletions(-) create mode 100644 hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseJpaBulkItemReader.java rename {hapi-fhir-jpaserver-base => hapi-fhir-storage}/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseBulkItemReader.java (54%) create mode 100644 hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseResourceToFileWriter.java create mode 100644 hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/bulk/model/BulkExportStatusEnumTest.java diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseJpaBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseJpaBulkItemReader.java new file mode 100644 index 00000000000..33d5583f5f2 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseJpaBulkItemReader.java @@ -0,0 +1,110 @@ +package ca.uhn.fhir.jpa.bulk.export.job; + +/*- + * #%L + * HAPI FHIR JPA Server + * %% + * Copyright (C) 2014 - 2021 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.context.RuntimeSearchParam; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.batch.config.BatchConstants; +import ca.uhn.fhir.jpa.dao.ISearchBuilder; +import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; +import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; +import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; +import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.util.SearchParameterUtil; +import ca.uhn.fhir.util.UrlUtil; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; + +import java.util.Date; +import java.util.Map; +import java.util.Optional; + +public abstract class BaseJpaBulkItemReader extends BaseBulkItemReader { + @Value("#{jobExecutionContext['" + BatchConstants.JOB_UUID_PARAMETER + "']}") + protected String myJobUUID; + @Autowired + protected DaoRegistry myDaoRegistry; + @Autowired + protected SearchBuilderFactory mySearchBuilderFactory; + @Autowired + private IBulkExportJobDao myBulkExportJobDao; + + private ISearchBuilder mySearchBuilder; + private BulkExportJobEntity myJobEntity; + + private RuntimeSearchParam myPatientSearchParam; + + /** + * Get and cache an ISearchBuilder for the given resource type this partition is responsible for. + */ + protected ISearchBuilder getSearchBuilderForLocalResourceType() { + if (mySearchBuilder == null) { + IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType); + RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType); + Class nextTypeClass = def.getImplementingClass(); + mySearchBuilder = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass); + } + return mySearchBuilder; + } + + @Override + protected String[] getTypeFilterList() { + BulkExportJobEntity jobEntity = getJobEntity(); + Map requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest()); + return requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER); + } + + @Override + protected Date getSinceDate() { + return getJobEntity().getSince(); + } + + @Override + protected String getLogInfoForRead() { + return "Bulk export starting generation for batch export job: " + getJobEntity() + " with resourceType " + myResourceType + " and UUID " + myJobUUID; + } + + protected BulkExportJobEntity getJobEntity() { + if (myJobEntity == null) { + Optional jobOpt = myBulkExportJobDao.findByJobId(myJobUUID); + if (jobOpt.isPresent()) { + myJobEntity = jobOpt.get(); + } else { + String errorMessage = String.format("Job with UUID %s does not exist!", myJobUUID); + throw new IllegalStateException(errorMessage); + } + } + return myJobEntity; + } + + protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() { + if (myPatientSearchParam == null) { + Optional onlyPatientSearchParamForResourceType = SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType); + if (onlyPatientSearchParamForResourceType.isPresent()) { + myPatientSearchParam = onlyPatientSearchParamForResourceType.get(); + } + } + return myPatientSearchParam; + } +} diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkExportJobConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkExportJobConfig.java index 312266b788a..34cf94f4985 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkExportJobConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkExportJobConfig.java @@ -20,6 +20,8 @@ package ca.uhn.fhir.jpa.bulk.export.job; * #L% */ +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.batch.config.BatchConstants; import ca.uhn.fhir.jpa.batch.processor.GoldenResourceAnnotatingProcessor; import ca.uhn.fhir.jpa.batch.processor.PidToIBaseResourceProcessor; @@ -58,6 +60,12 @@ public class BulkExportJobConfig { public static final int CHUNK_SIZE = 100; public static final String JOB_DESCRIPTION = "jobDescription"; + @Autowired + private FhirContext myFhirContext; + + @Autowired + private DaoRegistry myDaoRegistry; + @Autowired private StepBuilderFactory myStepBuilderFactory; @@ -265,7 +273,7 @@ public class BulkExportJobConfig { @Bean @StepScope public ResourceToFileWriter resourceToFileWriter() { - return new ResourceToFileWriter(); + return new ResourceToFileWriter(myFhirContext, myDaoRegistry); } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkItemReader.java index 83f7c7d0d50..39952f9c7f3 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BulkItemReader.java @@ -38,7 +38,7 @@ import java.util.Set; * Basic Bulk Export implementation which simply reads all type filters and applies them, along with the _since param * on a given resource type. */ -public class BulkItemReader extends BaseBulkItemReader { +public class BulkItemReader extends BaseJpaBulkItemReader { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); @Override @@ -46,7 +46,6 @@ public class BulkItemReader extends BaseBulkItemReader { ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID); Set myReadPids = new HashSet<>(); - List map = createSearchParameterMapsForResourceType(); ISearchBuilder sb = getSearchBuilderForLocalResourceType(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java index f857e07963e..d1cbe5112a8 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/GroupBulkItemReader.java @@ -64,7 +64,7 @@ import java.util.stream.Collectors; * 3. Optionally further expand that into all MDM-matched Patients (including golden resources) * 4. Then perform normal bulk export, filtered so that only results that refer to members are returned. */ -public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReader> { +public class GroupBulkItemReader extends BaseJpaBulkItemReader implements ItemReader> { private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); public static final int QUERY_CHUNK_SIZE = 100; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/PatientBulkItemReader.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/PatientBulkItemReader.java index 93519863ec7..0a89c3a5f0d 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/PatientBulkItemReader.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/PatientBulkItemReader.java @@ -45,7 +45,7 @@ import java.util.List; * 1. Determine the resourcetype * 2. Search for anything that has `patient-compartment-search-param:missing=false` */ -public class PatientBulkItemReader extends BaseBulkItemReader implements ItemReader> { +public class PatientBulkItemReader extends BaseJpaBulkItemReader implements ItemReader> { @Autowired private DaoConfig myDaoConfig; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java index d0655ca0619..8f3b6006292 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/ResourceToFileWriter.java @@ -21,68 +21,32 @@ package ca.uhn.fhir.jpa.bulk.export.job; */ import ca.uhn.fhir.context.FhirContext; -import ca.uhn.fhir.interceptor.model.RequestPartitionId; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; -import ca.uhn.fhir.jpa.batch.log.Logs; import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportDaoSvc; import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity; -import ca.uhn.fhir.jpa.partition.SystemRequestDetails; -import ca.uhn.fhir.parser.IParser; -import ca.uhn.fhir.rest.api.Constants; -import ca.uhn.fhir.util.BinaryUtil; import org.hl7.fhir.instance.model.api.IBaseBinary; -import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; -import org.slf4j.Logger; -import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import javax.annotation.PostConstruct; -import java.io.ByteArrayOutputStream; -import java.io.OutputStreamWriter; -import java.util.List; import java.util.Optional; -public class ResourceToFileWriter implements ItemWriter> { - private static final Logger ourLog = Logs.getBatchTroubleshootingLog(); - - @Autowired - private FhirContext myFhirContext; - - @Autowired - private DaoRegistry myDaoRegistry; - +public class ResourceToFileWriter extends BaseResourceToFileWriter { @Autowired private BulkExportDaoSvc myBulkExportDaoSvc; - private final ByteArrayOutputStream myOutputStream; - private final OutputStreamWriter myWriter; - private IParser myParser; - - @Value("#{stepExecutionContext['bulkExportCollectionEntityId']}") - private Long myBulkExportCollectionEntityId; - - @Value("#{stepExecutionContext['resourceType']}") - private String myResourceType; - - private IFhirResourceDao myBinaryDao; - - - public ResourceToFileWriter() { - myOutputStream = new ByteArrayOutputStream(); - myWriter = new OutputStreamWriter(myOutputStream, Constants.CHARSET_UTF8); + public ResourceToFileWriter(FhirContext theFhirContext, DaoRegistry theDaoRegistry) { + super(theFhirContext, theDaoRegistry); } @PostConstruct public void start() { - myParser = myFhirContext.newJsonParser().setPrettyPrint(false); myBinaryDao = getBinaryDao(); } - private Optional flushToFiles() { + @Override + protected Optional flushToFiles() { if (myOutputStream.size() > 0) { IIdType createdId = createBinaryFromOutputStream(); BulkExportCollectionFileEntity file = new BulkExportCollectionFileEntity(); @@ -98,34 +62,8 @@ public class ResourceToFileWriter implements ItemWriter> { return Optional.empty(); } - private IIdType createBinaryFromOutputStream() { - IBaseBinary binary = BinaryUtil.newBinary(myFhirContext); - binary.setContentType(Constants.CT_FHIR_NDJSON); - binary.setContent(myOutputStream.toByteArray()); - DaoMethodOutcome outcome = myBinaryDao.create(binary, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.defaultPartition())); - return outcome.getResource().getIdElement(); - } - @SuppressWarnings("unchecked") private IFhirResourceDao getBinaryDao() { return myDaoRegistry.getResourceDao("Binary"); } - - @Override - public void write(List> theList) throws Exception { - - int count = 0; - for (List resourceList : theList) { - for (IBaseResource nextFileResource : resourceList) { - myParser.encodeResourceToWriter(nextFileResource, myWriter); - myWriter.append("\n"); - count++; - } - } - - Optional createdId = flushToFiles(); - if (createdId.isPresent()) { - ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myResourceType); - } - } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseBulkItemReader.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseBulkItemReader.java similarity index 54% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseBulkItemReader.java rename to hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseBulkItemReader.java index 6f647bd0e58..906c5176db7 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseBulkItemReader.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseBulkItemReader.java @@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.bulk.export.job; /*- * #%L - * HAPI FHIR JPA Server + * HAPI FHIR Storage api * %% * Copyright (C) 2014 - 2021 Smile CDR, Inc. * %% @@ -22,23 +22,11 @@ package ca.uhn.fhir.jpa.bulk.export.job; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeResourceDefinition; -import ca.uhn.fhir.context.RuntimeSearchParam; -import ca.uhn.fhir.jpa.api.dao.DaoRegistry; -import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.batch.config.BatchConstants; import ca.uhn.fhir.jpa.batch.log.Logs; -import ca.uhn.fhir.jpa.dao.ISearchBuilder; -import ca.uhn.fhir.jpa.dao.SearchBuilderFactory; -import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao; -import ca.uhn.fhir.jpa.entity.BulkExportJobEntity; -import ca.uhn.fhir.jpa.model.util.JpaConstants; import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId; import ca.uhn.fhir.rest.param.DateRangeParam; -import ca.uhn.fhir.util.SearchParameterUtil; -import ca.uhn.fhir.util.UrlUtil; -import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.springframework.batch.item.ItemReader; import org.springframework.beans.factory.annotation.Autowired; @@ -47,10 +35,9 @@ import org.springframework.beans.factory.annotation.Value; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; public abstract class BaseBulkItemReader implements ItemReader> { @@ -58,57 +45,36 @@ public abstract class BaseBulkItemReader implements ItemReader myPidIterator; private RuntimeResourceDefinition myResourceDefinition; - private Iterator myPidIterator; - private RuntimeSearchParam myPatientSearchParam; - /** - * Get and cache an ISearchBuilder for the given resource type this partition is responsible for. - */ - protected ISearchBuilder getSearchBuilderForLocalResourceType() { - if (mySearchBuilder == null) { - IFhirResourceDao dao = myDaoRegistry.getResourceDao(myResourceType); - RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType); - Class nextTypeClass = def.getImplementingClass(); - mySearchBuilder = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass); - } - return mySearchBuilder; - } - - /** - * Generate the list of pids of all resources of the given myResourceType, which reference any group member of the given myGroupId. + * Generate the list of PIDs of all resources of the given myResourceType, which reference any group member of the given myGroupId. * Store them in a member iterator. */ - protected void loadResourcePids() { - //Initialize an array to hold the pids of the target resources to be exported. + protected void loadResourcePIDs() { + //Initialize an array to hold the PIDs of the target resources to be exported. myPidIterator = getResourcePidIterator(); } protected abstract Iterator getResourcePidIterator(); + protected abstract String[] getTypeFilterList(); + + protected abstract Date getSinceDate(); + + protected abstract String getLogInfoForRead(); + protected List createSearchParameterMapsForResourceType() { - BulkExportJobEntity jobEntity = getJobEntity(); RuntimeResourceDefinition theDef = getResourceDefinition(); - Map requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest()); - String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER); + String[] typeFilters = getTypeFilterList(); List spMaps = null; if (typeFilters != null) { spMaps = Arrays.stream(typeFilters) @@ -129,8 +95,8 @@ public abstract class BaseBulkItemReader implements ItemReader jobOpt = myBulkExportJobDao.findByJobId(myJobUUID); - if (jobOpt.isPresent()) { - myJobEntity = jobOpt.get(); - } else { - String errorMessage = String.format("Job with UUID %s does not exist!", myJobUUID); - throw new IllegalStateException(errorMessage); - } - } - return myJobEntity; - } - @Override public List read() { - - ourLog.info("Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", getJobEntity(), myResourceType, myJobUUID); + ourLog.info(getLogInfoForRead()); if (myPidIterator == null) { - loadResourcePids(); + loadResourcePIDs(); } int count = 0; @@ -177,18 +129,5 @@ public abstract class BaseBulkItemReader implements ItemReader onlyPatientSearchParamForResourceType = SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType); - if (onlyPatientSearchParamForResourceType.isPresent()) { - myPatientSearchParam = onlyPatientSearchParamForResourceType.get(); - } else { - - } - } - return myPatientSearchParam; } } diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseResourceToFileWriter.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseResourceToFileWriter.java new file mode 100644 index 00000000000..bc36d3c8e02 --- /dev/null +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/job/BaseResourceToFileWriter.java @@ -0,0 +1,101 @@ +package ca.uhn.fhir.jpa.bulk.export.job; + +/*- + * #%L + * HAPI FHIR Storage api + * %% + * Copyright (C) 2014 - 2021 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; +import ca.uhn.fhir.jpa.batch.log.Logs; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; +import ca.uhn.fhir.parser.IParser; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.util.BinaryUtil; +import org.hl7.fhir.instance.model.api.IBaseBinary; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.slf4j.Logger; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Value; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStreamWriter; +import java.util.List; +import java.util.Optional; + +public abstract class BaseResourceToFileWriter implements ItemWriter> { + protected static final Logger ourLog = Logs.getBatchTroubleshootingLog(); + + protected FhirContext myFhirContext; + + protected DaoRegistry myDaoRegistry; + + protected ByteArrayOutputStream myOutputStream; + + @Value("#{stepExecutionContext['bulkExportCollectionEntityId']}") + protected Long myBulkExportCollectionEntityId; + + @Value("#{stepExecutionContext['resourceType']}") + protected String myResourceType; + + protected IFhirResourceDao myBinaryDao; + private final OutputStreamWriter myWriter; + private final IParser myParser; + + protected BaseResourceToFileWriter(FhirContext theFhirContext, DaoRegistry theDaoRegistry) { + myFhirContext = theFhirContext; + myDaoRegistry = theDaoRegistry; + myParser = myFhirContext.newJsonParser().setPrettyPrint(false); + myOutputStream = new ByteArrayOutputStream(); + myWriter = new OutputStreamWriter(myOutputStream, Constants.CHARSET_UTF8); + } + + + protected IIdType createBinaryFromOutputStream() { + IBaseBinary binary = BinaryUtil.newBinary(myFhirContext); + binary.setContentType(Constants.CT_FHIR_NDJSON); + binary.setContent(myOutputStream.toByteArray()); + DaoMethodOutcome outcome = myBinaryDao.create(binary, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.defaultPartition())); + return outcome.getResource().getIdElement(); + } + + @Override + public void write(List> theList) throws Exception { + + int count = 0; + for (List resourceList : theList) { + for (IBaseResource nextFileResource : resourceList) { + myParser.encodeResourceToWriter(nextFileResource, myWriter); + myWriter.append("\n"); + count++; + } + } + + Optional createdId = flushToFiles(); + if (createdId.isPresent()) { + ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myResourceType); + } + } + + protected abstract Optional flushToFiles(); + +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/BulkExportJobStatusEnum.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/BulkExportJobStatusEnum.java index 482dd003318..0f39884e3ed 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/BulkExportJobStatusEnum.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/bulk/export/model/BulkExportJobStatusEnum.java @@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.bulk.export.model; */ import com.fasterxml.jackson.annotation.JsonFormat; +import org.springframework.batch.core.BatchStatus; @JsonFormat(shape = JsonFormat.Shape.STRING) public enum BulkExportJobStatusEnum { @@ -32,6 +33,19 @@ public enum BulkExportJobStatusEnum { SUBMITTED, BUILDING, COMPLETE, - ERROR + ERROR; + + public static BulkExportJobStatusEnum fromBatchStatus(BatchStatus status) { + switch (status) { + case STARTING : + return BulkExportJobStatusEnum.SUBMITTED; + case COMPLETED : + return BulkExportJobStatusEnum.COMPLETE; + case STARTED : + return BulkExportJobStatusEnum.BUILDING; + default : + return BulkExportJobStatusEnum.ERROR; + } + } } diff --git a/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/bulk/model/BulkExportStatusEnumTest.java b/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/bulk/model/BulkExportStatusEnumTest.java new file mode 100644 index 00000000000..1409fab925b --- /dev/null +++ b/hapi-fhir-storage/src/test/java/ca/uhn/fhir/jpa/bulk/model/BulkExportStatusEnumTest.java @@ -0,0 +1,21 @@ +package ca.uhn.fhir.jpa.bulk.model; + +import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum; +import org.junit.jupiter.api.Test; +import org.springframework.batch.core.BatchStatus; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BulkExportStatusEnumTest { + @Test + public void testFromBatchStatus(){ + assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.STARTED), BulkExportJobStatusEnum.BUILDING); + assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.STARTING), BulkExportJobStatusEnum.SUBMITTED); + assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.COMPLETED), BulkExportJobStatusEnum.COMPLETE); + assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.STOPPING), BulkExportJobStatusEnum.ERROR); + assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.STOPPED), BulkExportJobStatusEnum.ERROR); + assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.FAILED), BulkExportJobStatusEnum.ERROR); + assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.ABANDONED), BulkExportJobStatusEnum.ERROR); + assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.UNKNOWN), BulkExportJobStatusEnum.ERROR); + } +}