Reduce load on DB for bulk export

This commit is contained in:
James Agnew 2023-04-22 16:11:17 -04:00
parent 748e86cb04
commit 6975e26814
7 changed files with 195 additions and 77 deletions

View File

@ -37,9 +37,9 @@ import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.QueryChunker;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
@ -47,6 +47,7 @@ import ca.uhn.fhir.mdm.dao.IMdmLinkDao;
import ca.uhn.fhir.mdm.model.MdmPidTuple;
import ca.uhn.fhir.model.api.Include;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.param.HasOrListParam;
import ca.uhn.fhir.rest.param.HasParam;
@ -83,26 +84,19 @@ import static ca.uhn.fhir.rest.api.Constants.PARAM_HAS;
import static ca.uhn.fhir.rest.api.Constants.PARAM_ID;
public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
private static final Logger ourLog = LoggerFactory.getLogger(JpaBulkExportProcessor.class);
public static final int QUERY_CHUNK_SIZE = 100;
public static final List<String> PATIENT_BULK_EXPORT_FORWARD_REFERENCE_RESOURCE_TYPES = List.of("Practitioner", "Organization");
@Autowired
private FhirContext myContext;
@Autowired
private BulkExportHelperService myBulkExportHelperSvc;
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private DaoRegistry myDaoRegistry;
private static final Logger ourLog = LoggerFactory.getLogger(JpaBulkExportProcessor.class);
@Autowired
protected SearchBuilderFactory<JpaPid> mySearchBuilderFactory;
@Autowired
private FhirContext myContext;
@Autowired
private BulkExportHelperService myBulkExportHelperSvc;
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private IIdHelperService<JpaPid> myIdHelperService;
@ -115,59 +109,11 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
@Autowired
private EntityManager myEntityManager;
@Autowired
private IHapiTransactionService myHapiTransactionService;
private IFhirPath myFhirPath;
@Transactional
@Override
public Iterator<JpaPid> getResourcePidIterator(ExportPIDIteratorParameters theParams) {
String resourceType = theParams.getResourceType();
String jobId = theParams.getJobId();
RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType);
LinkedHashSet<JpaPid> pids;
if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.PATIENT) {
pids = getPidsForPatientStyleExport(theParams, resourceType, jobId, def);
} else if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.GROUP) {
pids = getPidsForGroupStyleExport(theParams, resourceType, def);
} else {
pids = getPidsForSystemStyleExport(theParams, jobId, def);
}
ourLog.debug("Finished expanding resource pids to export, size is {}", pids.size());
return pids.iterator();
}
private LinkedHashSet<JpaPid> getPidsForPatientStyleExport(ExportPIDIteratorParameters theParams, String resourceType, String jobId, RuntimeResourceDefinition def) {
LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
// Patient
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.DISABLED) {
String errorMessage = "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export";
ourLog.error(errorMessage);
throw new IllegalStateException(Msg.code(797) + errorMessage);
}
Set<String> patientSearchParams = SearchParameterUtil.getPatientSearchParamsForResourceType(myContext, theParams.getResourceType());
for (String patientSearchParam : patientSearchParams) {
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParams, false);
for (SearchParameterMap map : maps) {
//Ensure users did not monkey with the patient compartment search parameter.
validateSearchParametersForPatient(map, theParams);
ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
filterBySpecificPatient(theParams, resourceType, patientSearchParam, map);
SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, jobId);
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map, searchRuntime, null, RequestPartitionId.allPartitions());
while (resultIterator.hasNext()) {
pids.add(resultIterator.next());
}
}
}
return pids;
}
private static void filterBySpecificPatient(ExportPIDIteratorParameters theParams, String resourceType, String patientSearchParam, SearchParameterMap map) {
if (resourceType.equalsIgnoreCase("Patient")) {
if (theParams.getPatientIds() != null) {
@ -193,19 +139,90 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
return referenceOrListParam;
}
private LinkedHashSet<JpaPid> getPidsForSystemStyleExport(ExportPIDIteratorParameters theParams, String theJobId, RuntimeResourceDefinition theDef) {
@Override
public Iterator<JpaPid> getResourcePidIterator(ExportPIDIteratorParameters theParams) {
return myHapiTransactionService
.withRequest(null)
.readOnly()
.execute(() -> {
String resourceType = theParams.getResourceType();
String jobId = theParams.getJobId();
String chunkId = theParams.getChunkId();
RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType);
LinkedHashSet<JpaPid> pids;
if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.PATIENT) {
pids = getPidsForPatientStyleExport(theParams, resourceType, jobId, chunkId, def);
} else if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.GROUP) {
pids = getPidsForGroupStyleExport(theParams, resourceType, def);
} else {
pids = getPidsForSystemStyleExport(theParams, jobId, chunkId, def);
}
ourLog.debug("Finished expanding resource pids to export, size is {}", pids.size());
return pids.iterator();
});
}
private LinkedHashSet<JpaPid> getPidsForPatientStyleExport(ExportPIDIteratorParameters theParams, String resourceType, String theJobId, String theChunkId, RuntimeResourceDefinition def) {
LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
// Patient
if (myDaoConfig.getIndexMissingFields() == DaoConfig.IndexEnabledEnum.DISABLED) {
String errorMessage = "You attempted to start a Patient Bulk Export, but the system has `Index Missing Fields` disabled. It must be enabled for Patient Bulk Export";
ourLog.error(errorMessage);
throw new IllegalStateException(Msg.code(797) + errorMessage);
}
Set<String> patientSearchParams = SearchParameterUtil.getPatientSearchParamsForResourceType(myContext, theParams.getResourceType());
for (String patientSearchParam : patientSearchParams) {
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(def, theParams, false);
for (SearchParameterMap map : maps) {
//Ensure users did not monkey with the patient compartment search parameter.
validateSearchParametersForPatient(map, theParams);
ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
filterBySpecificPatient(theParams, resourceType, patientSearchParam, map);
SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, theJobId);
ourLog.info("Executing query for bulk export job[{}] chunk[{}]: {}", theJobId, theChunkId, map.toNormalizedQueryString(myContext));
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map, searchRuntime, null, RequestPartitionId.allPartitions());
int pidCount = 0;
while (resultIterator.hasNext()) {
if (pidCount % 1000 == 0) {
ourLog.info("Bulk export job[{}] chunk[{}] has loaded {} pids", theJobId, theChunkId, pidCount);
}
pidCount++;
pids.add(resultIterator.next());
}
}
}
return pids;
}
private LinkedHashSet<JpaPid> getPidsForSystemStyleExport(ExportPIDIteratorParameters theParams, String theJobId, String theChunkId, RuntimeResourceDefinition theDef) {
LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
// System
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true);
ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
for (SearchParameterMap map : maps) {
ourLog.info("Executing query for bulk export job[{}] chunk[{}]: {}", theJobId, theChunkId, map.toNormalizedQueryString(myContext));
// requires a transaction
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, theJobId),
null,
RequestPartitionId.allPartitions());
int pidCount = 0;
while (resultIterator.hasNext()) {
if (pidCount % 1000 == 0) {
ourLog.info("Bulk export job[{}] chunk[{}] has loaded {} pids", theJobId, theChunkId, pidCount);
}
pidCount++;
pids.add(resultIterator.next());
}
}

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.jpa.util.ScrollableResultsIterator;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.IoUtil;
import org.apache.commons.lang3.Validate;
import org.hibernate.CacheMode;
import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
import org.slf4j.Logger;
@ -119,6 +120,11 @@ public class SearchQueryExecutor implements ISearchQueryExecutor {
ourLog.trace("About to execute SQL: {}", sql);
hibernateQuery.setFetchSize(1000);
hibernateQuery.setCacheable(false);
hibernateQuery.setCacheMode(CacheMode.IGNORE);
hibernateQuery.setReadOnly(true);
ScrollableResults scrollableResults = hibernateQuery.scroll(ScrollMode.FORWARD_ONLY);
myResultSet = new ScrollableResultsIterator<>(scrollableResults);
myQueryInitialized = true;

View File

@ -8,23 +8,17 @@ import ca.uhn.fhir.batch2.jobs.export.models.ExpandedResourcesList;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.jobs.models.BatchResourceId;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.server.interceptor.ResponseSizeCapturingInterceptor;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvFileSource;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.MatcherAssert.assertThat;
@ -73,7 +67,7 @@ public class ExpandResourcesStepJpaTest extends BaseJpaR4Test {
ResourceIdList resourceList = new ResourceIdList();
resourceList.setResourceType("Patient");
resourceList.setIds(ids.stream().map(t->new BatchResourceId().setResourceType("Patient").setId(Long.toString(t))).toList());
resourceList.setIds(ids.stream().map(t -> new BatchResourceId().setResourceType("Patient").setId(Long.toString(t))).toList());
BulkExportJobParameters params = new BulkExportJobParameters();
JobInstance jobInstance = new JobInstance();
@ -107,5 +101,4 @@ public class ExpandResourcesStepJpaTest extends BaseJpaR4Test {
}
}

View File

@ -0,0 +1,79 @@
package ca.uhn.fhir.jpa.bulk.export;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.export.FetchResourceIdsStep;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoR4TagsTest;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import org.hl7.fhir.r4.model.DateTimeType;
import org.hl7.fhir.r4.model.OrganizationAffiliation;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class FetchResourceIdsStepJpaTest extends BaseJpaR4Test {
@Autowired
private FetchResourceIdsStep myFetchResourceIdsStep;
@Mock
private IJobDataSink<ResourceIdList> mySink;
@Captor
private ArgumentCaptor<ResourceIdList> myResourceIdListCaptor;
@Override
public void afterCleanupDao() {
super.afterCleanupDao();
myDaoConfig.setTagStorageMode(new DaoConfig().getTagStorageMode());
}
@Test
public void testSystemBulkExportWithManyTags() {
myDaoConfig.setTagStorageMode(DaoConfig.TagStorageModeEnum.INLINE);
mySearchParameterDao.create(FhirResourceDaoR4TagsTest.createSecuritySearchParameter(myFhirContext), mySrd);
mySearchParamRegistry.forceRefresh();
for (int i = 0; i < 10; i++) {
OrganizationAffiliation orgAff = new OrganizationAffiliation();
orgAff.getMeta().addSecurity().setSystem("http://foo").setCode("01B0");
orgAff.setActive(true);
myOrganizationAffiliationDao.create(orgAff, mySrd);
}
BulkExportJobParameters params = new BulkExportJobParameters();
params.setResourceTypes(List.of("OrganizationAffiliation"));
params.setStartDate(new DateTimeType("2023-01-01").getValue());
params.setFilters(List.of("OrganizationAffiliation?_security=01B0,01G0,01B0,01C0,17D0,02I0,02I0,02E0,03J0,03A0,03A0,03D0,03K0,05C0,04B0,04P0,05H0,05C0,04B0,06S0,06B0,06E0,07B0,07B0,07D0,08B0,08N0,08B0,08D0,09B0,09B0,09D0,10G0,10P0,10P0,10E0,11B0,11M0,11D0,12B0,12B0,12H0,13B0,13C0,14B0,15B0,14B0,14D0,15E0,16B0,16B0,16M0,16D0,18M0,17B0,20B0,20D0,22N0,22P0,22Q0,22S0,22B0,22B0,22B0,23B0,23E0,25E0,25B0,26B0,26B0,26H0,27B0,27B0,27B0,27Q2,28J0,28G0,29M0,28G0,28F0,30B0,30B0,31H0,31H0,31B0,32S0,32Q0,32Q0,32Y0,32R0,33B0,33B0,33D0,34P0,34V0,34P0,34U0,34P0,35B0,35E0,36P0,36B0,36K0,36F0,37B0,37B0,37D0,38B0,38F0,39D0,42B0,39B0,71A0,72A0,39W0,42B0,39W0,39F0,42F0,71B0,72B0,46B0,46K0,46B0,46P0,46E0,47B0,47F0,35A0,29A0,49A0,50I0,52S0,50I0,52S0,51P0,49A0,49B0,52G0,50J0,52V0,54A0,54B0,55A0,55A0,55D0,56B0,56B0,56D0,57A0,57B0,58B0,58A0,58B0,58D0,59H0,59H0,59C0,60B0,60M0,60F0,61B0,61S0,61F0,62A0,63B0,63B0,63B0,65B0,67B0,65R0,65Q0,65E0,67E0,68P0,68Q0,69B0,69B0,69C0,70J0,70G0,70B0"));
VoidModel data = new VoidModel();
JobInstance instance = new JobInstance();
instance.setInstanceId("instance-id");
String chunkId = "chunk-id";
StepExecutionDetails<BulkExportJobParameters, VoidModel> executionDetails = new StepExecutionDetails<>(params, data, instance, chunkId);
myCaptureQueriesListener.clear();
// Test
myFetchResourceIdsStep.run(executionDetails, mySink);
// Verify
verify(mySink, times(1)).accept(myResourceIdListCaptor.capture());
ResourceIdList idList = myResourceIdListCaptor.getAllValues().get(0);
assertEquals(10, idList.getIds().size());
}
}

View File

@ -464,4 +464,18 @@ public class FhirResourceDaoR4TagsTest extends BaseResourceProviderR4Test {
return meta.getProfile().stream().map(t -> t.getValue()).collect(Collectors.toList());
}
@Nonnull
public static SearchParameter createSecuritySearchParameter(FhirContext fhirContext) {
SearchParameter searchParameter = new SearchParameter();
searchParameter.setId("SearchParameter/resource-security");
for (String next : fhirContext.getResourceTypes().stream().sorted().collect(Collectors.toList())) {
searchParameter.addBase(next);
}
searchParameter.setStatus(Enumerations.PublicationStatus.ACTIVE);
searchParameter.setType(Enumerations.SearchParamType.TOKEN);
searchParameter.setCode("_security");
searchParameter.setName("Security");
searchParameter.setExpression("meta.security");
return searchParameter;
}
}

View File

@ -71,6 +71,7 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
providerParams.setGroupId(params.getGroupId());
providerParams.setPatientIds(params.getPatientIds());
providerParams.setExpandMdm(params.isExpandMdm());
providerParams.setChunkId(theStepExecutionDetails.getChunkId());
int submissionCount = 0;
try {

View File

@ -72,6 +72,7 @@ public class ExportPIDIteratorParameters {
* The patient id
*/
private List<String> myPatientIds;
private String myChunkId;
public String getResourceType() {
return myResourceType;
@ -142,4 +143,11 @@ public class ExportPIDIteratorParameters {
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
public void setChunkId(String theChunkId) {
myChunkId = theChunkId;
}
public String getChunkId() {
return myChunkId;
}
}