Batch job refactor (#3087)
* added AbstractResourceToFileWriter * Added AbstractBaseBulkItemReader * added bulkExportStatusEnum fromBatchStatus test * Code review name change Co-authored-by: Long Ma <longma@Longs-MBP.hitronhub.home> Co-authored-by: Steven Li <steven@smilecdr.com>
This commit is contained in:
parent
92e9859272
commit
34010fa78d
|
@ -0,0 +1,110 @@
|
|||
package ca.uhn.fhir.jpa.bulk.export.job;
|
||||
|
||||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR JPA Server
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
|
||||
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||
import ca.uhn.fhir.context.RuntimeSearchParam;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
|
||||
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
|
||||
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
|
||||
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
|
||||
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
|
||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||
import ca.uhn.fhir.util.SearchParameterUtil;
|
||||
import ca.uhn.fhir.util.UrlUtil;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public abstract class BaseJpaBulkItemReader extends BaseBulkItemReader {
|
||||
@Value("#{jobExecutionContext['" + BatchConstants.JOB_UUID_PARAMETER + "']}")
|
||||
protected String myJobUUID;
|
||||
@Autowired
|
||||
protected DaoRegistry myDaoRegistry;
|
||||
@Autowired
|
||||
protected SearchBuilderFactory mySearchBuilderFactory;
|
||||
@Autowired
|
||||
private IBulkExportJobDao myBulkExportJobDao;
|
||||
|
||||
private ISearchBuilder mySearchBuilder;
|
||||
private BulkExportJobEntity myJobEntity;
|
||||
|
||||
private RuntimeSearchParam myPatientSearchParam;
|
||||
|
||||
/**
|
||||
* Get and cache an ISearchBuilder for the given resource type this partition is responsible for.
|
||||
*/
|
||||
protected ISearchBuilder getSearchBuilderForLocalResourceType() {
|
||||
if (mySearchBuilder == null) {
|
||||
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(myResourceType);
|
||||
RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType);
|
||||
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
|
||||
mySearchBuilder = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
|
||||
}
|
||||
return mySearchBuilder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String[] getTypeFilterList() {
|
||||
BulkExportJobEntity jobEntity = getJobEntity();
|
||||
Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest());
|
||||
return requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Date getSinceDate() {
|
||||
return getJobEntity().getSince();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getLogInfoForRead() {
|
||||
return "Bulk export starting generation for batch export job: " + getJobEntity() + " with resourceType " + myResourceType + " and UUID " + myJobUUID;
|
||||
}
|
||||
|
||||
protected BulkExportJobEntity getJobEntity() {
|
||||
if (myJobEntity == null) {
|
||||
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
|
||||
if (jobOpt.isPresent()) {
|
||||
myJobEntity = jobOpt.get();
|
||||
} else {
|
||||
String errorMessage = String.format("Job with UUID %s does not exist!", myJobUUID);
|
||||
throw new IllegalStateException(errorMessage);
|
||||
}
|
||||
}
|
||||
return myJobEntity;
|
||||
}
|
||||
|
||||
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() {
|
||||
if (myPatientSearchParam == null) {
|
||||
Optional<RuntimeSearchParam> onlyPatientSearchParamForResourceType = SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType);
|
||||
if (onlyPatientSearchParamForResourceType.isPresent()) {
|
||||
myPatientSearchParam = onlyPatientSearchParamForResourceType.get();
|
||||
}
|
||||
}
|
||||
return myPatientSearchParam;
|
||||
}
|
||||
}
|
|
@ -20,6 +20,8 @@ package ca.uhn.fhir.jpa.bulk.export.job;
|
|||
* #L%
|
||||
*/
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
|
||||
import ca.uhn.fhir.jpa.batch.processor.GoldenResourceAnnotatingProcessor;
|
||||
import ca.uhn.fhir.jpa.batch.processor.PidToIBaseResourceProcessor;
|
||||
|
@ -58,6 +60,12 @@ public class BulkExportJobConfig {
|
|||
public static final int CHUNK_SIZE = 100;
|
||||
public static final String JOB_DESCRIPTION = "jobDescription";
|
||||
|
||||
@Autowired
|
||||
private FhirContext myFhirContext;
|
||||
|
||||
@Autowired
|
||||
private DaoRegistry myDaoRegistry;
|
||||
|
||||
@Autowired
|
||||
private StepBuilderFactory myStepBuilderFactory;
|
||||
|
||||
|
@ -265,7 +273,7 @@ public class BulkExportJobConfig {
|
|||
@Bean
|
||||
@StepScope
|
||||
public ResourceToFileWriter resourceToFileWriter() {
|
||||
return new ResourceToFileWriter();
|
||||
return new ResourceToFileWriter(myFhirContext, myDaoRegistry);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ import java.util.Set;
|
|||
* Basic Bulk Export implementation which simply reads all type filters and applies them, along with the _since param
|
||||
* on a given resource type.
|
||||
*/
|
||||
public class BulkItemReader extends BaseBulkItemReader {
|
||||
public class BulkItemReader extends BaseJpaBulkItemReader {
|
||||
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
|
||||
|
||||
@Override
|
||||
|
@ -46,7 +46,6 @@ public class BulkItemReader extends BaseBulkItemReader {
|
|||
ourLog.info("Bulk export assembling export of type {} for job {}", myResourceType, myJobUUID);
|
||||
Set<ResourcePersistentId> myReadPids = new HashSet<>();
|
||||
|
||||
|
||||
List<SearchParameterMap> map = createSearchParameterMapsForResourceType();
|
||||
ISearchBuilder sb = getSearchBuilderForLocalResourceType();
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ import java.util.stream.Collectors;
|
|||
* 3. Optionally further expand that into all MDM-matched Patients (including golden resources)
|
||||
* 4. Then perform normal bulk export, filtered so that only results that refer to members are returned.
|
||||
*/
|
||||
public class GroupBulkItemReader extends BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
|
||||
public class GroupBulkItemReader extends BaseJpaBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
|
||||
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
|
||||
public static final int QUERY_CHUNK_SIZE = 100;
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ import java.util.List;
|
|||
* 1. Determine the resourcetype
|
||||
* 2. Search for anything that has `patient-compartment-search-param:missing=false`
|
||||
*/
|
||||
public class PatientBulkItemReader extends BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
|
||||
public class PatientBulkItemReader extends BaseJpaBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
|
||||
@Autowired
|
||||
private DaoConfig myDaoConfig;
|
||||
|
||||
|
|
|
@ -21,68 +21,32 @@ package ca.uhn.fhir.jpa.bulk.export.job;
|
|||
*/
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
|
||||
import ca.uhn.fhir.jpa.batch.log.Logs;
|
||||
import ca.uhn.fhir.jpa.bulk.export.svc.BulkExportDaoSvc;
|
||||
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.util.BinaryUtil;
|
||||
import org.hl7.fhir.instance.model.api.IBaseBinary;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.slf4j.Logger;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
|
||||
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
|
||||
|
||||
@Autowired
|
||||
private FhirContext myFhirContext;
|
||||
|
||||
@Autowired
|
||||
private DaoRegistry myDaoRegistry;
|
||||
|
||||
public class ResourceToFileWriter extends BaseResourceToFileWriter {
|
||||
@Autowired
|
||||
private BulkExportDaoSvc myBulkExportDaoSvc;
|
||||
|
||||
private final ByteArrayOutputStream myOutputStream;
|
||||
private final OutputStreamWriter myWriter;
|
||||
private IParser myParser;
|
||||
|
||||
@Value("#{stepExecutionContext['bulkExportCollectionEntityId']}")
|
||||
private Long myBulkExportCollectionEntityId;
|
||||
|
||||
@Value("#{stepExecutionContext['resourceType']}")
|
||||
private String myResourceType;
|
||||
|
||||
private IFhirResourceDao<IBaseBinary> myBinaryDao;
|
||||
|
||||
|
||||
public ResourceToFileWriter() {
|
||||
myOutputStream = new ByteArrayOutputStream();
|
||||
myWriter = new OutputStreamWriter(myOutputStream, Constants.CHARSET_UTF8);
|
||||
public ResourceToFileWriter(FhirContext theFhirContext, DaoRegistry theDaoRegistry) {
|
||||
super(theFhirContext, theDaoRegistry);
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
myParser = myFhirContext.newJsonParser().setPrettyPrint(false);
|
||||
myBinaryDao = getBinaryDao();
|
||||
}
|
||||
|
||||
private Optional<IIdType> flushToFiles() {
|
||||
@Override
|
||||
protected Optional<IIdType> flushToFiles() {
|
||||
if (myOutputStream.size() > 0) {
|
||||
IIdType createdId = createBinaryFromOutputStream();
|
||||
BulkExportCollectionFileEntity file = new BulkExportCollectionFileEntity();
|
||||
|
@ -98,34 +62,8 @@ public class ResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
|
|||
return Optional.empty();
|
||||
}
|
||||
|
||||
private IIdType createBinaryFromOutputStream() {
|
||||
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
|
||||
binary.setContentType(Constants.CT_FHIR_NDJSON);
|
||||
binary.setContent(myOutputStream.toByteArray());
|
||||
DaoMethodOutcome outcome = myBinaryDao.create(binary, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.defaultPartition()));
|
||||
return outcome.getResource().getIdElement();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private IFhirResourceDao<IBaseBinary> getBinaryDao() {
|
||||
return myDaoRegistry.getResourceDao("Binary");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(List<? extends List<IBaseResource>> theList) throws Exception {
|
||||
|
||||
int count = 0;
|
||||
for (List<IBaseResource> resourceList : theList) {
|
||||
for (IBaseResource nextFileResource : resourceList) {
|
||||
myParser.encodeResourceToWriter(nextFileResource, myWriter);
|
||||
myWriter.append("\n");
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<IIdType> createdId = flushToFiles();
|
||||
if (createdId.isPresent()) {
|
||||
ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myResourceType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.bulk.export.job;
|
|||
|
||||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR JPA Server
|
||||
* HAPI FHIR Storage api
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
|
||||
* %%
|
||||
|
@ -22,23 +22,11 @@ package ca.uhn.fhir.jpa.bulk.export.job;
|
|||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.context.RuntimeResourceDefinition;
|
||||
import ca.uhn.fhir.context.RuntimeSearchParam;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
|
||||
import ca.uhn.fhir.jpa.batch.log.Logs;
|
||||
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
|
||||
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
|
||||
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
|
||||
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
|
||||
import ca.uhn.fhir.jpa.model.util.JpaConstants;
|
||||
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
|
||||
import ca.uhn.fhir.rest.param.DateRangeParam;
|
||||
import ca.uhn.fhir.util.SearchParameterUtil;
|
||||
import ca.uhn.fhir.util.UrlUtil;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.slf4j.Logger;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
@ -47,10 +35,9 @@ import org.springframework.beans.factory.annotation.Value;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePersistentId>> {
|
||||
|
@ -58,57 +45,36 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
|
|||
|
||||
@Value("#{stepExecutionContext['resourceType']}")
|
||||
protected String myResourceType;
|
||||
@Value("#{jobExecutionContext['" + BatchConstants.JOB_UUID_PARAMETER + "']}")
|
||||
protected String myJobUUID;
|
||||
@Value("#{jobParameters['" + BulkExportJobConfig.READ_CHUNK_PARAMETER + "']}")
|
||||
@Value("#{jobParameters['readChunkSize']}")
|
||||
protected Long myReadChunkSize;
|
||||
@Autowired
|
||||
protected DaoRegistry myDaoRegistry;
|
||||
@Autowired
|
||||
protected FhirContext myContext;
|
||||
@Autowired
|
||||
protected SearchBuilderFactory mySearchBuilderFactory;
|
||||
@Autowired
|
||||
private IBulkExportJobDao myBulkExportJobDao;
|
||||
@Autowired
|
||||
private MatchUrlService myMatchUrlService;
|
||||
|
||||
private ISearchBuilder mySearchBuilder;
|
||||
private BulkExportJobEntity myJobEntity;
|
||||
private Iterator<ResourcePersistentId> myPidIterator;
|
||||
private RuntimeResourceDefinition myResourceDefinition;
|
||||
|
||||
private Iterator<ResourcePersistentId> myPidIterator;
|
||||
private RuntimeSearchParam myPatientSearchParam;
|
||||
|
||||
/**
|
||||
* Get and cache an ISearchBuilder for the given resource type this partition is responsible for.
|
||||
*/
|
||||
protected ISearchBuilder getSearchBuilderForLocalResourceType() {
|
||||
if (mySearchBuilder == null) {
|
||||
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(myResourceType);
|
||||
RuntimeResourceDefinition def = myContext.getResourceDefinition(myResourceType);
|
||||
Class<? extends IBaseResource> nextTypeClass = def.getImplementingClass();
|
||||
mySearchBuilder = mySearchBuilderFactory.newSearchBuilder(dao, myResourceType, nextTypeClass);
|
||||
}
|
||||
return mySearchBuilder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the list of pids of all resources of the given myResourceType, which reference any group member of the given myGroupId.
|
||||
* Generate the list of PIDs of all resources of the given myResourceType, which reference any group member of the given myGroupId.
|
||||
* Store them in a member iterator.
|
||||
*/
|
||||
protected void loadResourcePids() {
|
||||
//Initialize an array to hold the pids of the target resources to be exported.
|
||||
protected void loadResourcePIDs() {
|
||||
//Initialize an array to hold the PIDs of the target resources to be exported.
|
||||
myPidIterator = getResourcePidIterator();
|
||||
}
|
||||
|
||||
protected abstract Iterator<ResourcePersistentId> getResourcePidIterator();
|
||||
|
||||
protected abstract String[] getTypeFilterList();
|
||||
|
||||
protected abstract Date getSinceDate();
|
||||
|
||||
protected abstract String getLogInfoForRead();
|
||||
|
||||
protected List<SearchParameterMap> createSearchParameterMapsForResourceType() {
|
||||
BulkExportJobEntity jobEntity = getJobEntity();
|
||||
RuntimeResourceDefinition theDef = getResourceDefinition();
|
||||
Map<String, String[]> requestUrl = UrlUtil.parseQueryStrings(jobEntity.getRequest());
|
||||
String[] typeFilters = requestUrl.get(JpaConstants.PARAM_EXPORT_TYPE_FILTER);
|
||||
String[] typeFilters = getTypeFilterList();
|
||||
List<SearchParameterMap> spMaps = null;
|
||||
if (typeFilters != null) {
|
||||
spMaps = Arrays.stream(typeFilters)
|
||||
|
@ -129,8 +95,8 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
|
|||
|
||||
private void enhanceSearchParameterMapWithCommonParameters(SearchParameterMap map) {
|
||||
map.setLoadSynchronous(true);
|
||||
if (getJobEntity().getSince() != null) {
|
||||
map.setLastUpdated(new DateRangeParam(getJobEntity().getSince(), null));
|
||||
if (getSinceDate() != null) {
|
||||
map.setLastUpdated(new DateRangeParam(getSinceDate(), null));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,26 +113,12 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
|
|||
return myResourceDefinition;
|
||||
}
|
||||
|
||||
protected BulkExportJobEntity getJobEntity() {
|
||||
if (myJobEntity == null) {
|
||||
Optional<BulkExportJobEntity> jobOpt = myBulkExportJobDao.findByJobId(myJobUUID);
|
||||
if (jobOpt.isPresent()) {
|
||||
myJobEntity = jobOpt.get();
|
||||
} else {
|
||||
String errorMessage = String.format("Job with UUID %s does not exist!", myJobUUID);
|
||||
throw new IllegalStateException(errorMessage);
|
||||
}
|
||||
}
|
||||
return myJobEntity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ResourcePersistentId> read() {
|
||||
|
||||
ourLog.info("Bulk export starting generation for batch export job: [{}] with resourceType [{}] and UUID [{}]", getJobEntity(), myResourceType, myJobUUID);
|
||||
ourLog.info(getLogInfoForRead());
|
||||
|
||||
if (myPidIterator == null) {
|
||||
loadResourcePids();
|
||||
loadResourcePIDs();
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
|
@ -177,18 +129,5 @@ public abstract class BaseBulkItemReader implements ItemReader<List<ResourcePers
|
|||
}
|
||||
|
||||
return outgoing.size() == 0 ? null : outgoing;
|
||||
|
||||
}
|
||||
|
||||
protected RuntimeSearchParam getPatientSearchParamForCurrentResourceType() {
|
||||
if (myPatientSearchParam == null) {
|
||||
Optional<RuntimeSearchParam> onlyPatientSearchParamForResourceType = SearchParameterUtil.getOnlyPatientSearchParamForResourceType(myContext, myResourceType);
|
||||
if (onlyPatientSearchParamForResourceType.isPresent()) {
|
||||
myPatientSearchParam = onlyPatientSearchParamForResourceType.get();
|
||||
} else {
|
||||
|
||||
}
|
||||
}
|
||||
return myPatientSearchParam;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
package ca.uhn.fhir.jpa.bulk.export.job;
|
||||
|
||||
/*-
|
||||
* #%L
|
||||
* HAPI FHIR Storage api
|
||||
* %%
|
||||
* Copyright (C) 2014 - 2021 Smile CDR, Inc.
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
|
||||
import ca.uhn.fhir.jpa.batch.log.Logs;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.util.BinaryUtil;
|
||||
import org.hl7.fhir.instance.model.api.IBaseBinary;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.slf4j.Logger;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public abstract class BaseResourceToFileWriter implements ItemWriter<List<IBaseResource>> {
|
||||
protected static final Logger ourLog = Logs.getBatchTroubleshootingLog();
|
||||
|
||||
protected FhirContext myFhirContext;
|
||||
|
||||
protected DaoRegistry myDaoRegistry;
|
||||
|
||||
protected ByteArrayOutputStream myOutputStream;
|
||||
|
||||
@Value("#{stepExecutionContext['bulkExportCollectionEntityId']}")
|
||||
protected Long myBulkExportCollectionEntityId;
|
||||
|
||||
@Value("#{stepExecutionContext['resourceType']}")
|
||||
protected String myResourceType;
|
||||
|
||||
protected IFhirResourceDao myBinaryDao;
|
||||
private final OutputStreamWriter myWriter;
|
||||
private final IParser myParser;
|
||||
|
||||
protected BaseResourceToFileWriter(FhirContext theFhirContext, DaoRegistry theDaoRegistry) {
|
||||
myFhirContext = theFhirContext;
|
||||
myDaoRegistry = theDaoRegistry;
|
||||
myParser = myFhirContext.newJsonParser().setPrettyPrint(false);
|
||||
myOutputStream = new ByteArrayOutputStream();
|
||||
myWriter = new OutputStreamWriter(myOutputStream, Constants.CHARSET_UTF8);
|
||||
}
|
||||
|
||||
|
||||
protected IIdType createBinaryFromOutputStream() {
|
||||
IBaseBinary binary = BinaryUtil.newBinary(myFhirContext);
|
||||
binary.setContentType(Constants.CT_FHIR_NDJSON);
|
||||
binary.setContent(myOutputStream.toByteArray());
|
||||
DaoMethodOutcome outcome = myBinaryDao.create(binary, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.defaultPartition()));
|
||||
return outcome.getResource().getIdElement();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(List<? extends List<IBaseResource>> theList) throws Exception {
|
||||
|
||||
int count = 0;
|
||||
for (List<IBaseResource> resourceList : theList) {
|
||||
for (IBaseResource nextFileResource : resourceList) {
|
||||
myParser.encodeResourceToWriter(nextFileResource, myWriter);
|
||||
myWriter.append("\n");
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<IIdType> createdId = flushToFiles();
|
||||
if (createdId.isPresent()) {
|
||||
ourLog.info("Created {} resources for bulk export file containing {} resources of type {} ", count, createdId.get().toUnqualifiedVersionless().getValue(), myResourceType);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Optional<IIdType> flushToFiles();
|
||||
|
||||
}
|
|
@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.bulk.export.model;
|
|||
*/
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||
import org.springframework.batch.core.BatchStatus;
|
||||
|
||||
@JsonFormat(shape = JsonFormat.Shape.STRING)
|
||||
public enum BulkExportJobStatusEnum {
|
||||
|
@ -32,6 +33,19 @@ public enum BulkExportJobStatusEnum {
|
|||
SUBMITTED,
|
||||
BUILDING,
|
||||
COMPLETE,
|
||||
ERROR
|
||||
ERROR;
|
||||
|
||||
public static BulkExportJobStatusEnum fromBatchStatus(BatchStatus status) {
|
||||
switch (status) {
|
||||
case STARTING :
|
||||
return BulkExportJobStatusEnum.SUBMITTED;
|
||||
case COMPLETED :
|
||||
return BulkExportJobStatusEnum.COMPLETE;
|
||||
case STARTED :
|
||||
return BulkExportJobStatusEnum.BUILDING;
|
||||
default :
|
||||
return BulkExportJobStatusEnum.ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package ca.uhn.fhir.jpa.bulk.model;
|
||||
|
||||
import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.batch.core.BatchStatus;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class BulkExportStatusEnumTest {
|
||||
@Test
|
||||
public void testFromBatchStatus(){
|
||||
assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.STARTED), BulkExportJobStatusEnum.BUILDING);
|
||||
assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.STARTING), BulkExportJobStatusEnum.SUBMITTED);
|
||||
assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.COMPLETED), BulkExportJobStatusEnum.COMPLETE);
|
||||
assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.STOPPING), BulkExportJobStatusEnum.ERROR);
|
||||
assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.STOPPED), BulkExportJobStatusEnum.ERROR);
|
||||
assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.FAILED), BulkExportJobStatusEnum.ERROR);
|
||||
assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.ABANDONED), BulkExportJobStatusEnum.ERROR);
|
||||
assertEquals(BulkExportJobStatusEnum.fromBatchStatus(BatchStatus.UNKNOWN), BulkExportJobStatusEnum.ERROR);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue