Subscription retriggering enhancements (#4758)

* Work on forward porting reindex optimize storage

* Subscription retriggering performance enhancements

* Adjust readmes

* Add a test

* Add test

* Cleanup

* Test cleanup

* Add comment

* Add changelogs

* Test fixes

* Test fixes

* Bump size

* Fixes

* Test logging

* Restore forkcount

* Test fix

* Test fix

* Version bump

* Clean up changelogs

* Address review comment
This commit is contained in:
James Agnew 2023-04-28 12:06:38 -04:00 committed by GitHub
parent 78ff58277a
commit 69a79e39a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 1106 additions and 327 deletions

View File

@ -26,6 +26,8 @@ public class Logs {
private static final Logger ourBatchTroubleshootingLog = LoggerFactory.getLogger("ca.uhn.fhir.log.batch_troubleshooting");
private static final Logger ourNarrativeGenerationTroubleshootingLog = LoggerFactory.getLogger("ca.uhn.fhir.log.narrative_generation_troubleshooting");
private static final Logger ourSubscriptionTroubleshootingLog = LoggerFactory.getLogger("ca.cdr.log.subscription_troubleshooting");
public static Logger getBatchTroubleshootingLog() {
return ourBatchTroubleshootingLog;
}
@ -33,4 +35,8 @@ public class Logs {
public static Logger getNarrativeGenerationTroubleshootingLog() {
return ourBatchTroubleshootingLog;
}
public static Logger getSubscriptionTroubleshootingLog() {
return ourSubscriptionTroubleshootingLog;
}
}

View File

@ -3,4 +3,5 @@ type: perf
issue: 4742
title: "The SQL generated for the JPA server `$trigger-subscription` operation has been
optimized in order to drastically reduce the number of database round trips for large
triggering jobs."
triggering jobs. In addition, a bug prevented the subscription retriggering from fully
executing in a multithreaded way. This has been corrected."

View File

@ -0,0 +1,8 @@
---
type: add
issue: 4758
title: "The $reindex operation now supports several new optional parameters. 'optimizeStorage' can
be used to migrate data to/from inline storage mode, and `optimisticLock` can be used to
control whether to guard against concurrent data modification during reindexing.
`reindexSearchParameters` can be used to contron whether search parameters are
reindexed."

View File

@ -0,0 +1,6 @@
---
type: add
issue: 4758
title: "The JPA server REST-HOOK subscription delivery client had a hardcoded connection pool
limit of 20 outgoing connections. This has been increased to 1000, since subscriptions
themselves and their delivery channels already act as a rate-limiting step."

View File

@ -0,0 +1,5 @@
---
type: perf
issue: 4758
title: "When performing FHIR resource searches, the resource ID fetch size has been reduced. This
should improve the memory impact of performing searches which return very large numbers of resources."

View File

@ -0,0 +1,5 @@
---
type: add
issue: 4758
title: "An unexpected failure to reindex a single resource during a reindex job will no
longer cause the entire job to fail."

View File

@ -36,6 +36,7 @@ import ca.uhn.fhir.jpa.dao.IResultIterator;
import ca.uhn.fhir.jpa.dao.ISearchBuilder;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.mdm.MdmExpansionCacheSvc;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.search.SearchRuntimeDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -53,6 +54,7 @@ import ca.uhn.fhir.rest.param.ReferenceOrListParam;
import ca.uhn.fhir.rest.param.ReferenceParam;
import ca.uhn.fhir.util.ExtensionUtil;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.SearchParameterUtil;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseExtension;
@ -62,7 +64,6 @@ import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import javax.persistence.EntityManager;
@ -114,29 +115,37 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
@Autowired
private EntityManager myEntityManager;
@Autowired
private IHapiTransactionService myHapiTransactionService;
private IFhirPath myFhirPath;
@Transactional
@Override
public Iterator<JpaPid> getResourcePidIterator(ExportPIDIteratorParameters theParams) {
return myHapiTransactionService
.withRequest(null)
.readOnly()
.execute(() -> {
String resourceType = theParams.getResourceType();
String jobId = theParams.getJobId();
String jobId = theParams.getInstanceId();
String chunkId = theParams.getChunkId();
RuntimeResourceDefinition def = myContext.getResourceDefinition(resourceType);
LinkedHashSet<JpaPid> pids;
if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.PATIENT) {
pids = getPidsForPatientStyleExport(theParams, resourceType, jobId, def);
pids = getPidsForPatientStyleExport(theParams, resourceType, jobId, chunkId, def);
} else if (theParams.getExportStyle() == BulkDataExportOptions.ExportStyle.GROUP) {
pids = getPidsForGroupStyleExport(theParams, resourceType, def);
} else {
pids = getPidsForSystemStyleExport(theParams, jobId, def);
pids = getPidsForSystemStyleExport(theParams, jobId, chunkId, def);
}
ourLog.debug("Finished expanding resource pids to export, size is {}", pids.size());
return pids.iterator();
});
}
private LinkedHashSet<JpaPid> getPidsForPatientStyleExport(ExportPIDIteratorParameters theParams, String resourceType, String jobId, RuntimeResourceDefinition def) {
private LinkedHashSet<JpaPid> getPidsForPatientStyleExport(ExportPIDIteratorParameters theParams, String resourceType, String theJobId, String theChunkId, RuntimeResourceDefinition def) {
LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
// Patient
if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.DISABLED) {
@ -157,9 +166,17 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
filterBySpecificPatient(theParams, resourceType, patientSearchParam, map);
SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, jobId);
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map, searchRuntime, null, theParams.getPartitionIdOrAllPartitions());
SearchRuntimeDetails searchRuntime = new SearchRuntimeDetails(null, theJobId);
Logs.getBatchTroubleshootingLog().debug("Executing query for bulk export job[{}] chunk[{}]: {}", theJobId, theChunkId, map.toNormalizedQueryString(myContext));
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map, searchRuntime, new SystemRequestDetails(), theParams.getPartitionIdOrAllPartitions());
int pidCount = 0;
while (resultIterator.hasNext()) {
if (pidCount % 10000 == 0) {
Logs.getBatchTroubleshootingLog().debug("Bulk export job[{}] chunk[{}] has loaded {} pids", theJobId, theChunkId, pidCount);
}
pidCount++;
pids.add(resultIterator.next());
}
}
@ -192,19 +209,26 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
return referenceOrListParam;
}
private LinkedHashSet<JpaPid> getPidsForSystemStyleExport(ExportPIDIteratorParameters theParams, String theJobId, RuntimeResourceDefinition theDef) {
private LinkedHashSet<JpaPid> getPidsForSystemStyleExport(ExportPIDIteratorParameters theParams, String theJobId, String theChunkId, RuntimeResourceDefinition theDef) {
LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
// System
List<SearchParameterMap> maps = myBulkExportHelperSvc.createSearchParameterMapsForResourceType(theDef, theParams, true);
ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType(theParams.getResourceType());
for (SearchParameterMap map : maps) {
Logs.getBatchTroubleshootingLog().debug("Executing query for bulk export job[{}] chunk[{}]: {}", theJobId, theChunkId, map.toNormalizedQueryString(myContext));
// requires a transaction
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, theJobId),
null,
theParams.getPartitionIdOrAllPartitions());
int pidCount = 0;
while (resultIterator.hasNext()) {
if (pidCount % 10000 == 0) {
Logs.getBatchTroubleshootingLog().debug("Bulk export job[{}] chunk[{}] has loaded {} pids", theJobId, theChunkId, pidCount);
}
pidCount++;
pids.add(resultIterator.next());
}
}
@ -230,9 +254,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
LinkedHashSet<JpaPid> pids = new LinkedHashSet<>();
Set<JpaPid> expandedMemberResourceIds = expandAllPatientPidsFromGroup(theParams);
assert expandedMemberResourceIds != null && !expandedMemberResourceIds.isEmpty();
if (ourLog.isDebugEnabled()) {
ourLog.debug("{} has been expanded to members:[{}]", theParams.getGroupId(), expandedMemberResourceIds);
}
Logs.getBatchTroubleshootingLog().debug("{} has been expanded to members:[{}]", theParams.getGroupId(), expandedMemberResourceIds);
//Next, let's search for the target resources, with their correct patient references, chunked.
//The results will be jammed into myReadPids
@ -350,9 +372,9 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
for (SearchParameterMap map : maps) {
ISearchBuilder<JpaPid> searchBuilder = getSearchBuilderForResourceType("Patient");
ourLog.debug("Searching for members of group {} with job id {} with map {}", theParameters.getGroupId(), theParameters.getJobId(), map);
ourLog.debug("Searching for members of group {} with job instance {} with map {}", theParameters.getGroupId(), theParameters.getInstanceId(), map);
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(map,
new SearchRuntimeDetails(null, theParameters.getJobId()),
new SearchRuntimeDetails(null, theParameters.getInstanceId()),
null,
theParameters.getPartitionIdOrAllPartitions());
@ -451,7 +473,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
//Execute query and all found pids to our local iterator.
RequestPartitionId partitionId = theParams.getPartitionIdOrAllPartitions();
IResultIterator<JpaPid> resultIterator = searchBuilder.createQuery(expandedSpMap,
new SearchRuntimeDetails(null, theParams.getJobId()),
new SearchRuntimeDetails(null, theParams.getInstanceId()),
null,
partitionId);
while (resultIterator.hasNext()) {
@ -461,7 +483,7 @@ public class JpaBulkExportProcessor implements IBulkExportProcessor<JpaPid> {
// add _include to results to support ONC
Set<Include> includes = Collections.singleton(new Include("*", true));
SystemRequestDetails requestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId);
Set<JpaPid> includeIds = searchBuilder.loadIncludes(myContext, myEntityManager, theReadPids, includes, false, expandedSpMap.getLastUpdated(), theParams.getJobId(), requestDetails, null);
Set<JpaPid> includeIds = searchBuilder.loadIncludes(myContext, myEntityManager, theReadPids, includes, false, expandedSpMap.getLastUpdated(), theParams.getInstanceId(), requestDetails, null);
// gets rid of the Patient duplicates
theReadPids.addAll(includeIds.stream().filter((id) -> !id.getResourceType().equals("Patient")).collect(Collectors.toSet()));
}

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.config.r4;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.context.ParserOptions;
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
@ -52,6 +53,14 @@ public class FhirContextR4Config {
parserOptions.setDontStripVersionsFromReferencesAtPaths(DEFAULT_PRESERVE_VERSION_REFS_R4_AND_LATER);
}
// We use this context to create subscription deliveries and that kind of thing. It doesn't
// make much sense to let the HTTP client pool be a blocker since we have delivery queue
// sizing as the lever to affect that. So make the pool big enough that it shouldn't get
// in the way.
IRestfulClientFactory clientFactory = theFhirContext.getRestfulClientFactory();
clientFactory.setPoolMaxPerRoute(1000);
clientFactory.setPoolMaxTotal(1000);
return theFhirContext;
}
}

View File

@ -35,6 +35,8 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.ReindexOutcome;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.model.DeleteConflictList;
import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
@ -51,6 +53,7 @@ import ca.uhn.fhir.jpa.model.entity.BaseHasResource;
import ca.uhn.fhir.jpa.model.entity.BaseTag;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.model.entity.PartitionablePartitionId;
import ca.uhn.fhir.jpa.model.entity.ResourceEncodingEnum;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
@ -68,9 +71,9 @@ import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.model.api.StorageResponseCodeEnum;
import ca.uhn.fhir.model.dstu2.resource.BaseResource;
import ca.uhn.fhir.model.dstu2.resource.ListResource;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum;
@ -94,7 +97,6 @@ import ca.uhn.fhir.rest.param.HistorySearchDateRangeParam;
import ca.uhn.fhir.rest.server.IPagingProvider;
import ca.uhn.fhir.rest.server.IRestfulServerDefaults;
import ca.uhn.fhir.rest.server.RestfulServerUtils;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
@ -126,6 +128,8 @@ import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@ -199,6 +203,36 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Autowired
private ResourceSearchUrlSvc myResourceSearchUrlSvc;
public static <T extends IBaseResource> T invokeStoragePreShowResources(IInterceptorBroadcaster theInterceptorBroadcaster, RequestDetails theRequest, T retVal) {
if (CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESHOW_RESOURCES, theInterceptorBroadcaster, theRequest)) {
SimplePreResourceShowDetails showDetails = new SimplePreResourceShowDetails(retVal);
HookParams params = new HookParams()
.add(IPreResourceShowDetails.class, showDetails)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
CompositeInterceptorBroadcaster.doCallHooks(theInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PRESHOW_RESOURCES, params);
//noinspection unchecked
retVal = (T) showDetails.getResource(0);//TODO GGG/JA : getting resource 0 is interesting. We apparently allow null values in the list. Should we?
return retVal;
} else {
return retVal;
}
}
public static void invokeStoragePreAccessResources(IInterceptorBroadcaster theInterceptorBroadcaster, RequestDetails theRequest, IIdType theId, IBaseResource theResource) {
if (CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PREACCESS_RESOURCES, theInterceptorBroadcaster, theRequest)) {
SimplePreResourceAccessDetails accessDetails = new SimplePreResourceAccessDetails(theResource);
HookParams params = new HookParams()
.add(IPreResourceAccessDetails.class, accessDetails)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
CompositeInterceptorBroadcaster.doCallHooks(theInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
if (accessDetails.isDontReturnResourceAtIndex(0)) {
throw new ResourceNotFoundException(Msg.code(1995) + "Resource " + theId + " is not known");
}
}
}
@Override
protected HapiTransactionService getTransactionService() {
return myTransactionService;
@ -1281,27 +1315,74 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@SuppressWarnings("unchecked")
@Override
public void reindex(IResourcePersistentId thePid, RequestDetails theRequest, TransactionDetails theTransactionDetails) {
public ReindexOutcome reindex(IResourcePersistentId thePid, ReindexParameters theReindexParameters, RequestDetails theRequest, TransactionDetails theTransactionDetails) {
ReindexOutcome retVal = new ReindexOutcome();
JpaPid jpaPid = (JpaPid) thePid;
// Careful! Reindex only reads ResourceTable, but we tell Hibernate to check version
// to ensure Hibernate will catch concurrent updates (PUT/DELETE) elsewhere.
// Otherwise, we may index stale data. See #4584
// We use the main entity as the lock object since all the index rows hang off it.
ResourceTable entity =
myEntityManager.find(ResourceTable.class, jpaPid.getId(), LockModeType.OPTIMISTIC);
if (entity == null) {
ourLog.warn("Unable to find entity with PID: {}", jpaPid.getId());
return;
ResourceTable entity;
if (theReindexParameters.isOptimisticLock()) {
entity = myEntityManager.find(ResourceTable.class, jpaPid.getId(), LockModeType.OPTIMISTIC);
} else {
entity = myEntityManager.find(ResourceTable.class, jpaPid.getId());
}
if (entity == null) {
retVal.addWarning("Unable to find entity with PID: " + jpaPid.getId());
return retVal;
}
if (theReindexParameters.getReindexSearchParameters() == ReindexParameters.ReindexSearchParametersEnum.ALL) {
reindexSearchParameters(entity, retVal);
}
if (theReindexParameters.getOptimizeStorage() != ReindexParameters.OptimizeStorageModeEnum.NONE) {
reindexOptimizeStorage(entity, theReindexParameters.getOptimizeStorage());
}
return retVal;
}
@SuppressWarnings("unchecked")
private void reindexSearchParameters(ResourceTable entity, ReindexOutcome theReindexOutcome) {
try {
T resource = (T) myJpaStorageResourceParser.toResource(entity, false);
reindex(resource, entity);
} catch (BaseServerResponseException | DataFormatException e) {
} catch (Exception e) {
theReindexOutcome.addWarning("Failed to reindex resource " + entity.getIdDt() + ": " + e);
myResourceTableDao.updateIndexStatus(entity.getId(), INDEX_STATUS_INDEXING_FAILED);
throw e;
}
}
private void reindexOptimizeStorage(ResourceTable entity, ReindexParameters.OptimizeStorageModeEnum theOptimizeStorageMode) {
ResourceHistoryTable historyEntity = entity.getCurrentVersionEntity();
if (historyEntity != null) {
reindexOptimizeStorageHistoryEntity(entity, historyEntity);
if (theOptimizeStorageMode == ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS) {
int pageSize = 100;
for (int page = 0; ((long)page * pageSize) < entity.getVersion(); page++) {
Slice<ResourceHistoryTable> historyEntities = myResourceHistoryTableDao.findForResourceIdAndReturnEntities(PageRequest.of(page, pageSize), entity.getId(), historyEntity.getVersion());
for (ResourceHistoryTable next : historyEntities) {
reindexOptimizeStorageHistoryEntity(entity, next);
}
}
}
}
}
private void reindexOptimizeStorageHistoryEntity(ResourceTable entity, ResourceHistoryTable historyEntity) {
if (historyEntity.getEncoding() == ResourceEncodingEnum.JSONC || historyEntity.getEncoding() == ResourceEncodingEnum.JSON) {
byte[] resourceBytes = historyEntity.getResource();
if (resourceBytes != null) {
String resourceText = decodeResource(resourceBytes, historyEntity.getEncoding());
if (myStorageSettings.getInlineResourceTextBelowSize() > 0 && resourceText.length() < myStorageSettings.getInlineResourceTextBelowSize()) {
ourLog.debug("Storing text of resource {} version {} as inline VARCHAR", entity.getResourceId(), historyEntity.getVersion());
myResourceHistoryTableDao.setResourceTextVcForVersion(historyEntity.getId(), resourceText);
}
}
}
}
@ -1381,7 +1462,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return myEntityManager.find(ResourceTable.class, jpaPid.getId());
}
@Override
@Nonnull
protected ResourceTable readEntityLatestVersion(IIdType theId, RequestDetails theRequestDetails, TransactionDetails theTransactionDetails) {
@ -1433,7 +1513,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
}
@Transactional
@Override
public void removeTag(IIdType theId, TagTypeEnum theTagType, String theScheme, String theTerm) {
@ -1532,7 +1611,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
for (List<IQueryParameterType> orValues : andOrValues) {
List<IQueryParameterType> orList = new ArrayList<>();
for (IQueryParameterType value : orValues) {
orList.add(new HasParam("List", ListResource.SP_ITEM, ListResource.SP_RES_ID, value.getValueAsQueryToken(null)));
orList.add(new HasParam("List", ListResource.SP_ITEM, BaseResource.SP_RES_ID, value.getValueAsQueryToken(null)));
}
hasParamValues.add(orList);
}
@ -1958,37 +2037,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
myIdHelperService = theIdHelperService;
}
public static <T extends IBaseResource> T invokeStoragePreShowResources(IInterceptorBroadcaster theInterceptorBroadcaster, RequestDetails theRequest, T retVal) {
if (CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESHOW_RESOURCES, theInterceptorBroadcaster, theRequest)) {
SimplePreResourceShowDetails showDetails = new SimplePreResourceShowDetails(retVal);
HookParams params = new HookParams()
.add(IPreResourceShowDetails.class, showDetails)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
CompositeInterceptorBroadcaster.doCallHooks(theInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PRESHOW_RESOURCES, params);
//noinspection unchecked
retVal = (T) showDetails.getResource(0);//TODO GGG/JA : getting resource 0 is interesting. We apparently allow null values in the list. Should we?
return retVal;
} else {
return retVal;
}
}
public static void invokeStoragePreAccessResources(IInterceptorBroadcaster theInterceptorBroadcaster, RequestDetails theRequest, IIdType theId, IBaseResource theResource) {
if (CompositeInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PREACCESS_RESOURCES, theInterceptorBroadcaster, theRequest)) {
SimplePreResourceAccessDetails accessDetails = new SimplePreResourceAccessDetails(theResource);
HookParams params = new HookParams()
.add(IPreResourceAccessDetails.class, accessDetails)
.add(RequestDetails.class, theRequest)
.addIfMatchesType(ServletRequestDetails.class, theRequest);
CompositeInterceptorBroadcaster.doCallHooks(theInterceptorBroadcaster, theRequest, Pointcut.STORAGE_PREACCESS_RESOURCES, params);
if (accessDetails.isDontReturnResourceAtIndex(0)) {
throw new ResourceNotFoundException(Msg.code(1995) + "Resource " + theId + " is not known");
}
}
}
private static class IdChecker implements IValidatorModule {
private final ValidationModeEnum myMode;

View File

@ -34,6 +34,7 @@ import ca.uhn.fhir.jpa.dao.expunge.ExpungeService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.BaseHasResource;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
@ -55,6 +56,7 @@ import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nullable;
import javax.persistence.EntityManager;
import javax.persistence.LockModeType;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceContextType;
import javax.persistence.TypedQuery;
@ -168,7 +170,7 @@ public abstract class BaseHapiFhirSystemDao<T extends IBaseBundle, MT> extends B
}
@Override
public <P extends IResourcePersistentId> void preFetchResources(List<P> theResolvedIds) {
public <P extends IResourcePersistentId> void preFetchResources(List<P> theResolvedIds, boolean thePreFetchIndexes) {
HapiTransactionService.requireTransaction();
List<Long> pids = theResolvedIds
.stream()
@ -194,47 +196,49 @@ public abstract class BaseHapiFhirSystemDao<T extends IBaseBundle, MT> extends B
List<Long> entityIds;
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsStringPopulated()).map(t -> t.getId()).collect(Collectors.toList());
if (thePreFetchIndexes) {
entityIds = loadedResourceTableEntries.stream().filter(ResourceTable::isParamsStringPopulated).map(ResourceTable::getId).collect(Collectors.toList());
if (entityIds.size() > 0) {
preFetchIndexes(entityIds, "string", "myParamsString", null);
}
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsTokenPopulated()).map(t -> t.getId()).collect(Collectors.toList());
entityIds = loadedResourceTableEntries.stream().filter(ResourceTable::isParamsTokenPopulated).map(ResourceTable::getId).collect(Collectors.toList());
if (entityIds.size() > 0) {
preFetchIndexes(entityIds, "token", "myParamsToken", null);
}
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsDatePopulated()).map(t -> t.getId()).collect(Collectors.toList());
entityIds = loadedResourceTableEntries.stream().filter(ResourceTable::isParamsDatePopulated).map(ResourceTable::getId).collect(Collectors.toList());
if (entityIds.size() > 0) {
preFetchIndexes(entityIds, "date", "myParamsDate", null);
}
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isParamsQuantityPopulated()).map(t -> t.getId()).collect(Collectors.toList());
entityIds = loadedResourceTableEntries.stream().filter(ResourceTable::isParamsQuantityPopulated).map(ResourceTable::getId).collect(Collectors.toList());
if (entityIds.size() > 0) {
preFetchIndexes(entityIds, "quantity", "myParamsQuantity", null);
}
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasLinks()).map(t -> t.getId()).collect(Collectors.toList());
entityIds = loadedResourceTableEntries.stream().filter(ResourceTable::isHasLinks).map(ResourceTable::getId).collect(Collectors.toList());
if (entityIds.size() > 0) {
preFetchIndexes(entityIds, "resourceLinks", "myResourceLinks", null);
}
entityIds = loadedResourceTableEntries.stream().filter(t -> t.isHasTags()).map(t -> t.getId()).collect(Collectors.toList());
entityIds = loadedResourceTableEntries.stream().filter(BaseHasResource::isHasTags).map(ResourceTable::getId).collect(Collectors.toList());
if (entityIds.size() > 0) {
myResourceTagDao.findByResourceIds(entityIds);
preFetchIndexes(entityIds, "tags", "myTags", null);
}
entityIds = loadedResourceTableEntries.stream().map(t->t.getId()).collect(Collectors.toList());
entityIds = loadedResourceTableEntries.stream().map(ResourceTable::getId).collect(Collectors.toList());
if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.ENABLED) {
preFetchIndexes(entityIds, "searchParamPresence", "mySearchParamPresents", null);
}
}
new QueryChunker<ResourceTable>().chunk(loadedResourceTableEntries, SearchBuilder.getMaximumPageSize() / 2, entries -> {
Map<Long, ResourceTable> entities = entries
.stream()
.collect(Collectors.toMap(t -> t.getId(), t -> t));
.collect(Collectors.toMap(ResourceTable::getId, t -> t));
CriteriaBuilder b = myEntityManager.getCriteriaBuilder();
CriteriaQuery<ResourceHistoryTable> q = b.createQuery(ResourceHistoryTable.class);

View File

@ -165,7 +165,7 @@ public class TransactionProcessor extends BaseTransactionProcessor {
preFetchConditionalUrls(theTransactionDetails, theEntries, theVersionAdapter, theRequestPartitionId, idsToPreFetch);
IFhirSystemDao<?, ?> systemDao = myApplicationContext.getBean(IFhirSystemDao.class);
systemDao.preFetchResources(JpaPid.fromLongList(idsToPreFetch));
systemDao.preFetchResources(JpaPid.fromLongList(idsToPreFetch), true);
}
private void preFetchResourcesById(TransactionDetails theTransactionDetails, List<IBase> theEntries, ITransactionProcessorVersionAdapter theVersionAdapter, RequestPartitionId theRequestPartitionId, Set<String> foundIds, List<Long> idsToPreFetch) {

View File

@ -38,13 +38,15 @@ public interface IResourceHistoryTableDao extends JpaRepository<ResourceHistoryT
@Query("SELECT t FROM ResourceHistoryTable t WHERE t.myResourceId = :resId ORDER BY t.myResourceVersion ASC")
List<ResourceHistoryTable> findAllVersionsForResourceIdInOrder(@Param("resId") Long theId);
@Query("SELECT t FROM ResourceHistoryTable t LEFT OUTER JOIN FETCH t.myProvenance WHERE t.myResourceId = :id AND t.myResourceVersion = :version")
ResourceHistoryTable findForIdAndVersionAndFetchProvenance(@Param("id") long theId, @Param("version") long theVersion);
@Query("SELECT t.myId FROM ResourceHistoryTable t WHERE t.myResourceId = :resId AND t.myResourceVersion != :dontWantVersion")
Slice<Long> findForResourceId(Pageable thePage, @Param("resId") Long theId, @Param("dontWantVersion") Long theDontWantVersion);
@Query("SELECT t FROM ResourceHistoryTable t WHERE t.myResourceId = :resId AND t.myResourceVersion != :dontWantVersion")
Slice<ResourceHistoryTable> findForResourceIdAndReturnEntities(Pageable thePage, @Param("resId") Long theId, @Param("dontWantVersion") Long theDontWantVersion);
@Query("" +
"SELECT v.myId FROM ResourceHistoryTable v " +
"LEFT OUTER JOIN ResourceTable t ON (v.myResourceId = t.myId) " +
@ -65,6 +67,13 @@ public interface IResourceHistoryTableDao extends JpaRepository<ResourceHistoryT
"WHERE v.myResourceVersion != t.myVersion")
Slice<Long> findIdsOfPreviousVersionsOfResources(Pageable thePage);
/**
* Sets the inline text and clears the LOB copy of the text
*/
@Modifying
@Query("UPDATE ResourceHistoryTable as t SET t.myResource = null, t.myResourceTextVc = :text WHERE t.myId = :pid")
void setResourceTextVcForVersion(@Param("pid") Long id, @Param("text") String resourceText);
@Modifying
@Query("UPDATE ResourceHistoryTable r SET r.myResourceVersion = :newVersion WHERE r.myResourceId = :id AND r.myResourceVersion = :oldVersion")
void updateVersion(@Param("id") long theId, @Param("oldVersion") long theOldVersion, @Param("newVersion") long theNewVersion);

View File

@ -20,11 +20,13 @@
package ca.uhn.fhir.jpa.search.builder.sql;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.search.builder.ISearchQueryExecutor;
import ca.uhn.fhir.jpa.util.ScrollableResultsIterator;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.IoUtil;
import org.apache.commons.lang3.Validate;
import org.hibernate.CacheMode;
import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
import org.slf4j.Logger;
@ -109,7 +111,7 @@ public class SearchQueryExecutor implements ISearchQueryExecutor {
* Note that we use the spring managed connection, and the expectation is that a transaction that
* is managed by Spring has been started before this method is called.
*/
assert TransactionSynchronizationManager.isSynchronizationActive();
HapiTransactionService.requireTransaction();
Query nativeQuery = myEntityManager.createNativeQuery(sql);
org.hibernate.query.Query<?> hibernateQuery = (org.hibernate.query.Query<?>) nativeQuery;
@ -119,6 +121,15 @@ public class SearchQueryExecutor implements ISearchQueryExecutor {
ourLog.trace("About to execute SQL: {}", sql);
/*
* These settings help to ensure that we use a search cursor
* as opposed to loading all search results into memory
*/
hibernateQuery.setFetchSize(500000);
hibernateQuery.setCacheable(false);
hibernateQuery.setCacheMode(CacheMode.IGNORE);
hibernateQuery.setReadOnly(true);
// This tells hibernate not to flush when we call scroll(), but rather to wait until the transaction commits and
// only flush then. We need to do this so that any exceptions that happen in the transaction happen when
// we try to commit the transaction, and not here.

View File

@ -174,7 +174,7 @@ public class TermLoaderSvcImpl implements ITermLoaderSvc {
private static final int LOG_INCREMENT = 1000;
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(TermLoaderSvcImpl.class);
// FYI: Hardcoded to R4 because that's what the term svc uses internally
private final FhirContext myCtx = FhirContext.forR4();
private final FhirContext myCtx = FhirContext.forR4Cached();
private final ITermDeferredStorageSvc myDeferredStorageSvc;
private final ITermCodeSystemStorageSvc myCodeSystemStorageSvc;
@ -499,7 +499,7 @@ public class TermLoaderSvcImpl implements ITermLoaderSvc {
CodeSystem imgthlaCs;
try {
String imgthlaCsString = IOUtils.toString(TermReadSvcImpl.class.getResourceAsStream("/ca/uhn/fhir/jpa/term/imgthla/imgthla.xml"), Charsets.UTF_8);
imgthlaCs = FhirContext.forR4().newXmlParser().parseResource(CodeSystem.class, imgthlaCsString);
imgthlaCs = FhirContext.forR4Cached().newXmlParser().parseResource(CodeSystem.class, imgthlaCsString);
} catch (IOException e) {
throw new InternalErrorException(Msg.code(869) + "Failed to load imgthla.xml", e);
}
@ -602,7 +602,7 @@ public class TermLoaderSvcImpl implements ITermLoaderSvc {
throw new InvalidRequestException(Msg.code(875) + "Did not find loinc.xml in the ZIP distribution.");
}
CodeSystem loincCs = FhirContext.forR4().newXmlParser().parseResource(CodeSystem.class, loincCsString);
CodeSystem loincCs = FhirContext.forR4Cached().newXmlParser().parseResource(CodeSystem.class, loincCsString);
if (isNotBlank(loincCs.getVersion())) {
throw new InvalidRequestException(Msg.code(876) + "'loinc.xml' file must not have a version defined. To define a version use '" +
LOINC_CODESYSTEM_VERSION.getCode() + "' property of 'loincupload.properties' file");

View File

@ -22,6 +22,7 @@ import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.mdm.dao.IMdmLinkDao;
import ca.uhn.fhir.mdm.model.MdmPidTuple;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import ca.uhn.fhir.rest.api.server.storage.BaseResourcePersistentId;
@ -55,6 +56,7 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -64,9 +66,12 @@ import static org.mockito.AdditionalMatchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.hamcrest.Matchers.containsString;
@ExtendWith(MockitoExtension.class)
public class JpaBulkExportProcessorTest {
@ -97,7 +102,7 @@ public class JpaBulkExportProcessorTest {
}
@Override
public void close() throws IOException {
public void close() {
}
@ -144,7 +149,7 @@ public class JpaBulkExportProcessorTest {
private ExportPIDIteratorParameters createExportParameters(BulkDataExportOptions.ExportStyle theExportStyle) {
ExportPIDIteratorParameters parameters = new ExportPIDIteratorParameters();
parameters.setJobId("jobId");
parameters.setInstanceId("instanceId");
parameters.setExportStyle(theExportStyle);
if (theExportStyle == BulkDataExportOptions.ExportStyle.GROUP) {
parameters.setGroupId("123");
@ -204,10 +209,10 @@ public class JpaBulkExportProcessorTest {
.thenReturn(searchBuilder);
// ret
when(searchBuilder.createQuery(
eq(map),
any(SearchRuntimeDetails.class),
any(),
eq(getPartitionIdFromParams(thePartitioned))))
any(),
any(),
any()))
.thenReturn(resultIterator);
// test
@ -220,6 +225,12 @@ public class JpaBulkExportProcessorTest {
assertTrue(pidIterator.hasNext());
assertEquals(pid2, pidIterator.next());
assertFalse(pidIterator.hasNext());
verify(searchBuilder, times(1)).createQuery(
eq(map),
any(SearchRuntimeDetails.class),
nullable(RequestDetails.class),
eq(getPartitionIdFromParams(thePartitioned)));
}
private RequestPartitionId getPartitionIdFromParams(boolean thePartitioned) {
@ -244,8 +255,8 @@ public class JpaBulkExportProcessorTest {
try {
myProcessor.getResourcePidIterator(parameters);
fail();
} catch (IllegalStateException ex) {
assertTrue(ex.getMessage().contains("You attempted to start a Patient Bulk Export,"));
} catch (InternalErrorException ex) {
assertThat(ex.getMessage(), containsString("You attempted to start a Patient Bulk Export,"));
}
}

View File

@ -65,7 +65,7 @@ public class MemberMatcherR4HelperTest {
@RegisterExtension
LogbackCaptureTestExtension myLogCapture = new LogbackCaptureTestExtension((Logger) MemberMatcherR4Helper.ourLog, Level.TRACE);
@Spy
private final FhirContext myFhirContext = FhirContext.forR4();
private final FhirContext myFhirContext = FhirContext.forR4Cached();
@Mock
private IFhirResourceDao<Coverage> myCoverageDao;
@Mock

View File

@ -0,0 +1,13 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} [%file:%line] %msg%n</pattern>
</encoder>
</appender>
<!-- define the root first, so the rest can inherit our logger -->
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.hibernate.annotations.Columns;
import org.hibernate.annotations.OptimisticLock;
import javax.persistence.CascadeType;

View File

@ -43,6 +43,8 @@ import ca.uhn.fhir.rest.gclient.IClientExecutable;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.BundleUtil;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -105,7 +107,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
}
String payloadId = thePayloadResource.getIdElement().toUnqualified().getValue();
ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadId, theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
StopWatch sw = new StopWatch();
try {
operation.execute();
@ -115,6 +117,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
throw e;
}
Logs.getSubscriptionTroubleshootingLog().debug("Delivered {} rest-hook payload {} for {} in {}", theMsg.getOperationType(), payloadId, theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue(), sw);
}
}

View File

@ -497,7 +497,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
}
};
myExecutorService = new ThreadPoolExecutor(
0,
10,
10,
0L,
TimeUnit.MILLISECONDS,

View File

@ -109,6 +109,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<excludes>
<exclude>**/stresstest/*</exclude>
</excludes>

View File

@ -237,7 +237,7 @@ public class BulkDataExportProviderTest {
assertTrue(params.getResourceTypes().contains(patientResource));
assertTrue(params.getResourceTypes().contains(practitionerResource));
assertEquals(Constants.CT_FHIR_NDJSON, params.getOutputFormat());
assertNotNull(params.getStartDate());
assertNotNull(params.getSince());
assertTrue(params.getFilters().contains(filter));
assertThat(params.getPostFetchFilterUrls(), contains("Patient?_tag=foo"));
}
@ -296,7 +296,7 @@ public class BulkDataExportProviderTest {
BulkExportParameters params = verifyJobStart();
assertEquals(Constants.CT_FHIR_NDJSON, params.getOutputFormat());
assertThat(params.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
assertThat(params.getStartDate(), notNullValue());
assertThat(params.getSince(), notNullValue());
assertThat(params.getFilters(), containsInAnyOrder("Patient?identifier=foo"));
}
@ -325,7 +325,7 @@ public class BulkDataExportProviderTest {
BulkExportParameters params = verifyJobStart();
assertEquals(Constants.CT_FHIR_NDJSON, params.getOutputFormat());
assertThat(params.getResourceTypes(), containsInAnyOrder("Patient", "EpisodeOfCare"));
assertThat(params.getStartDate(), nullValue());
assertThat(params.getSince(), nullValue());
assertThat(params.getFilters(), containsInAnyOrder("Patient?_id=P999999990", "EpisodeOfCare?patient=P999999990"));
}
@ -610,7 +610,7 @@ public class BulkDataExportProviderTest {
assertEquals(Constants.CT_FHIR_NDJSON, bp.getOutputFormat());
assertThat(bp.getResourceTypes(), containsInAnyOrder("Observation", "DiagnosticReport"));
assertThat(bp.getStartDate(), notNullValue());
assertThat(bp.getSince(), notNullValue());
assertThat(bp.getFilters(), notNullValue());
assertEquals(GROUP_ID, bp.getGroupId());
assertThat(bp.isExpandMdm(), is(equalTo(true)));
@ -645,7 +645,7 @@ public class BulkDataExportProviderTest {
BulkExportParameters bp = verifyJobStart();
assertEquals(Constants.CT_FHIR_NDJSON, bp.getOutputFormat());
assertThat(bp.getResourceTypes(), containsInAnyOrder("Patient", "Practitioner"));
assertThat(bp.getStartDate(), notNullValue());
assertThat(bp.getSince(), notNullValue());
assertThat(bp.getFilters(), notNullValue());
assertEquals(GROUP_ID, bp.getGroupId());
assertThat(bp.isExpandMdm(), is(equalTo(true)));
@ -820,7 +820,7 @@ public class BulkDataExportProviderTest {
BulkExportParameters bp = verifyJobStart();
assertEquals(Constants.CT_FHIR_NDJSON, bp.getOutputFormat());
assertThat(bp.getResourceTypes(), containsInAnyOrder("Immunization", "Observation"));
assertThat(bp.getStartDate(), notNullValue());
assertThat(bp.getSince(), notNullValue());
assertThat(bp.getFilters(), containsInAnyOrder("Immunization?vaccine-code=foo"));
}

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.bulk;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.model.BulkExportJobResults;
import ca.uhn.fhir.jpa.api.svc.IBatch2JobRunner;
@ -30,6 +31,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
@ -690,14 +692,32 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
verifyBulkExportResults(options, expectedContainedIds, Collections.emptyList());
}
private void verifyBulkExportResults(BulkDataExportOptions theOptions, List<String> theContainedList, List<String> theExcludedList) {
@Test
public void testSystemBulkExport() {
List<String> expectedIds = new ArrayList<>();
for (int i = 0; i < 20; i++) {
expectedIds.add(createPatient(withActiveTrue()).getValue());
expectedIds.add(createObservation(withStatus("final")).getValue());
}
final BulkDataExportOptions options = new BulkDataExportOptions();
options.setResourceTypes(Set.of("Patient", "Observation"));
options.setFilters(Set.of("Patient?active=true", "Patient?active=false", "Observation?status=final"));
options.setExportStyle(BulkDataExportOptions.ExportStyle.SYSTEM);
options.setOutputFormat(Constants.CT_FHIR_NDJSON);
JobInstance finalJobInstance = verifyBulkExportResults(options, expectedIds, List.of());
assertEquals(40, finalJobInstance.getCombinedRecordsProcessed());
}
private JobInstance verifyBulkExportResults(BulkDataExportOptions theOptions, List<String> theContainedList, List<String> theExcludedList) {
Batch2JobStartResponse startResponse = myJobRunner.startNewJob(BulkExportUtils.createBulkExportJobParametersFromExportOptions(theOptions));
assertNotNull(startResponse);
assertFalse(startResponse.isUsesCachedResult());
// Run a scheduled pass to build the export
myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 120);
JobInstance jobInstance = myBatch2JobHelper.awaitJobCompletion(startResponse.getInstanceId(), 120);
await()
.atMost(200, TimeUnit.SECONDS)
@ -740,6 +760,8 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
}
}
return jobInstance;
}
for (String containedString : theContainedList) {
@ -748,6 +770,7 @@ public class BulkDataExportTest extends BaseResourceProviderR4Test {
for (String excludedString : theExcludedList) {
assertThat("Didn't want unexpected ID " + excludedString + " in IDS: " + foundIds, foundIds, not(hasItem(excludedString)));
}
return jobInstance;
}

View File

@ -0,0 +1,83 @@
package ca.uhn.fhir.jpa.bulk.export;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.export.FetchResourceIdsStep;
import ca.uhn.fhir.batch2.jobs.export.models.BulkExportJobParameters;
import ca.uhn.fhir.batch2.jobs.export.models.ResourceIdList;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoR4TagsTest;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.bulk.BulkDataExportOptions;
import org.hl7.fhir.r4.model.DateTimeType;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.OrganizationAffiliation;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.HashSet;
import java.util.List;
public class FetchResourceIdsStepJpaTest extends BaseJpaR4Test {
@Autowired
private FetchResourceIdsStep myFetchResourceIdsStep;
@Mock
private IJobDataSink<ResourceIdList> mySink;
@Captor
private ArgumentCaptor<ResourceIdList> myResourceIdListCaptor;
@Override
public void afterCleanupDao() {
super.afterCleanupDao();
myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode());
}
@Test
public void testSystemBulkExportWithManyTags() {
myStorageSettings.setTagStorageMode(JpaStorageSettings.TagStorageModeEnum.INLINE);
mySearchParameterDao.create(FhirResourceDaoR4TagsTest.createSecuritySearchParameter(myFhirContext), mySrd);
for (int i = 0; i < 10; i++) {
OrganizationAffiliation orgAff = new OrganizationAffiliation();
orgAff.getMeta().addSecurity().setSystem("http://foo").setCode("01B0");
orgAff.setActive(true);
myOrganizationAffiliationDao.create(orgAff, mySrd);
}
BulkExportJobParameters params = new BulkExportJobParameters();
params.setResourceTypes(List.of("OrganizationAffiliation"));
params.setSince(new DateTimeType("2023-01-01").getValue());
params.setFilters(List.of("OrganizationAffiliation?_security=01B0,01G0,01B0,01C0,17D0,02I0,02I0,02E0,03J0,03A0,03A0,03D0,03K0,05C0,04B0,04P0,05H0,05C0,04B0,06S0,06B0,06E0,07B0,07B0,07D0,08B0,08N0,08B0,08D0,09B0,09B0,09D0,10G0,10P0,10P0,10E0,11B0,11M0,11D0,12B0,12B0,12H0,13B0,13C0,14B0,15B0,14B0,14D0,15E0,16B0,16B0,16M0,16D0,18M0,17B0,20B0,20D0,22N0,22P0,22Q0,22S0,22B0,22B0,22B0,23B0,23E0,25E0,25B0,26B0,26B0,26H0,27B0,27B0,27B0,27Q2,28J0,28G0,29M0,28G0,28F0,30B0,30B0,31H0,31H0,31B0,32S0,32Q0,32Q0,32Y0,32R0,33B0,33B0,33D0,34P0,34V0,34P0,34U0,34P0,35B0,35E0,36P0,36B0,36K0,36F0,37B0,37B0,37D0,38B0,38F0,39D0,42B0,39B0,71A0,72A0,39W0,42B0,39W0,39F0,42F0,71B0,72B0,46B0,46K0,46B0,46P0,46E0,47B0,47F0,35A0,29A0,49A0,50I0,52S0,50I0,52S0,51P0,49A0,49B0,52G0,50J0,52V0,54A0,54B0,55A0,55A0,55D0,56B0,56B0,56D0,57A0,57B0,58B0,58A0,58B0,58D0,59H0,59H0,59C0,60B0,60M0,60F0,61B0,61S0,61F0,62A0,63B0,63B0,63B0,65B0,67B0,65R0,65Q0,65E0,67E0,68P0,68Q0,69B0,69B0,69C0,70J0,70G0,70B0"));
VoidModel data = new VoidModel();
JobInstance instance = new JobInstance();
instance.setInstanceId("instance-id");
String chunkId = "chunk-id";
StepExecutionDetails<BulkExportJobParameters, VoidModel> executionDetails = new StepExecutionDetails<>(params, data, instance, chunkId);
myCaptureQueriesListener.clear();
// Test
myFetchResourceIdsStep.run(executionDetails, mySink);
// Verify
verify(mySink, times(1)).accept(myResourceIdListCaptor.capture());
ResourceIdList idList = myResourceIdListCaptor.getAllValues().get(0);
assertEquals(10, idList.getIds().size());
}
}

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.bulk.imprt2;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.jobs.imprt.ConsumeFilesStep;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.r4.BasePartitioningR4Test;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.IdType;
@ -19,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.ServletException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@ -40,13 +42,16 @@ public class ConsumeFilesStepR4Test extends BasePartitioningR4Test {
public void before() throws ServletException {
super.before();
myPartitionSettings.setPartitioningEnabled(false);
myStorageSettings.setInlineResourceTextBelowSize(10000);
}
@AfterEach
@Override
public void after() {
super.after();
myStorageSettings.setInlineResourceTextBelowSize(new JpaStorageSettings().getInlineResourceTextBelowSize());
}
@Test
public void testAlreadyExisting_NoChanges() {
// Setup
@ -82,10 +87,10 @@ public class ConsumeFilesStepR4Test extends BasePartitioningR4Test {
// Validate
assertEquals(7, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(7, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countInsertQueriesForCurrentThread(), myCaptureQueriesListener.getInsertQueriesForCurrentThread().stream().map(t->t.getSql(true, false)).collect(Collectors.joining("\n")));
assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
assertEquals(1, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());
@ -142,9 +147,9 @@ public class ConsumeFilesStepR4Test extends BasePartitioningR4Test {
// Validate
assertEquals(7, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(2, myCaptureQueriesListener.logInsertQueries());
assertEquals(4, myCaptureQueriesListener.logUpdateQueries());
assertEquals(7, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
assertEquals(2, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(4, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());
@ -184,9 +189,9 @@ public class ConsumeFilesStepR4Test extends BasePartitioningR4Test {
assertThat(myCaptureQueriesListener.getSelectQueries().get(0).getSql(true, false),
either(containsString("forcedid0_.RESOURCE_TYPE='Patient' and forcedid0_.FORCED_ID='B' and (forcedid0_.PARTITION_ID is null) or forcedid0_.RESOURCE_TYPE='Patient' and forcedid0_.FORCED_ID='A' and (forcedid0_.PARTITION_ID is null)"))
.or(containsString("forcedid0_.RESOURCE_TYPE='Patient' and forcedid0_.FORCED_ID='A' and (forcedid0_.PARTITION_ID is null) or forcedid0_.RESOURCE_TYPE='Patient' and forcedid0_.FORCED_ID='B' and (forcedid0_.PARTITION_ID is null)")));
assertEquals(52, myCaptureQueriesListener.logInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(52, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
assertEquals(1, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());

View File

@ -1,14 +1,21 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.support.ValidationSupportContext;
import ca.uhn.fhir.context.support.ValueSetExpansionOptions;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
import ca.uhn.fhir.jpa.entity.TermValueSet;
import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
@ -34,9 +41,11 @@ import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import ca.uhn.fhir.util.BundleBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.CareTeam;
import org.hl7.fhir.r4.model.CodeSystem;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Coverage;
@ -48,6 +57,7 @@ import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Location;
import org.hl7.fhir.r4.model.Narrative;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Practitioner;
import org.hl7.fhir.r4.model.Provenance;
@ -57,9 +67,6 @@ import org.hl7.fhir.r4.model.ServiceRequest;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.hl7.fhir.r4.model.ValueSet;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.Parameters;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
@ -67,6 +74,8 @@ import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
@ -89,8 +98,11 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
@ -109,14 +121,6 @@ import static org.mockito.Mockito.when;
@SuppressWarnings("JavadocBlankLines")
@TestMethodOrder(MethodOrderer.MethodName.class)
public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoR4QueryCountTest.class);
@Autowired
private ISearchParamPresentDao mySearchParamPresentDao;
@Autowired
private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@RegisterExtension
@Order(0)
public static final RestfulServerExtension ourServer = new RestfulServerExtension(FhirContext.forR4Cached())
@ -124,7 +128,15 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
@RegisterExtension
@Order(1)
public static final HashMapResourceProviderExtension<Patient> ourPatientProvider = new HashMapResourceProviderExtension<>(ourServer, Patient.class);
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoR4QueryCountTest.class);
@Autowired
private ISearchParamPresentDao mySearchParamPresentDao;
@Autowired
private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@Autowired
private ReindexStep myReindexStep;
@AfterEach
public void afterResetDao() {
@ -142,6 +154,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
myStorageSettings.setPopulateIdentifierInAutoCreatedPlaceholderReferenceTargets(new JpaStorageSettings().isPopulateIdentifierInAutoCreatedPlaceholderReferenceTargets());
myStorageSettings.setResourceClientIdStrategy(new JpaStorageSettings().getResourceClientIdStrategy());
myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode());
myStorageSettings.setInlineResourceTextBelowSize(new JpaStorageSettings().getInlineResourceTextBelowSize());
myStorageSettings.clearSupportedSubscriptionTypesForUnitTest();
TermReadSvcImpl.setForceDisableHibernateSearchForUnitTest(false);
@ -881,6 +894,64 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
}
@ParameterizedTest
@CsvSource({
// NoOp OptimisticLock OptimizeMode ExpectedSelect ExpectedUpdate
" false, false, CURRENT_VERSION, 2, 10",
" true, false, CURRENT_VERSION, 2, 0",
" false, true, CURRENT_VERSION, 12, 10",
" true, true, CURRENT_VERSION, 12, 0",
" false, false, ALL_VERSIONS, 22, 20",
" true, false, ALL_VERSIONS, 22, 0",
" false, true, ALL_VERSIONS, 32, 20",
" true, true, ALL_VERSIONS, 32, 0",
})
public void testReindexJob_OptimizeStorage(boolean theNoOp, boolean theOptimisticLock, ReindexParameters.OptimizeStorageModeEnum theOptimizeStorageModeEnum, int theExpectedSelectCount, int theExpectedUpdateCount) {
// Setup
// In no-op mode, the inlining is already in the state it needs to be in
if (theNoOp) {
myStorageSettings.setInlineResourceTextBelowSize(10000);
} else {
myStorageSettings.setInlineResourceTextBelowSize(0);
}
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
IIdType patientId = createPatient(withActiveTrue());
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId(patientId.toUnqualifiedVersionless());
p.setActive(true);
p.addIdentifier().setValue("" + i);
myPatientDao.update(p, mySrd);
}
data.addTypedPid("Patient", patientId.getIdPartAsLong());
for (int i = 0; i < 9; i++) {
IIdType nextPatientId = createPatient(withActiveTrue());
data.addTypedPid("Patient", nextPatientId.getIdPartAsLong());
}
myStorageSettings.setInlineResourceTextBelowSize(10000);
ReindexJobParameters params = new ReindexJobParameters()
.setOptimizeStorage(theOptimizeStorageModeEnum)
.setReindexSearchParameters(ReindexParameters.ReindexSearchParametersEnum.NONE)
.setOptimisticLock(theOptimisticLock);
// execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, mock(IJobDataSink.class), "123", "456", params);
// validate
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
assertEquals(theExpectedSelectCount, myCaptureQueriesListener.getSelectQueriesForCurrentThread().size());
assertEquals(theExpectedUpdateCount, myCaptureQueriesListener.getUpdateQueriesForCurrentThread().size());
assertEquals(0, myCaptureQueriesListener.getInsertQueriesForCurrentThread().size());
assertEquals(0, myCaptureQueriesListener.getDeleteQueriesForCurrentThread().size());
assertEquals(10, outcome.getRecordsProcessed());
}
public void assertNoPartitionSelectors() {
List<SqlQuery> selectQueries = myCaptureQueriesListener.getSelectQueriesForCurrentThread();
for (SqlQuery next : selectQueries) {
@ -2776,12 +2847,21 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
}
myCaptureQueriesListener.clear();
mySystemDao.transaction(mySrd, input);
Bundle output = mySystemDao.transaction(mySrd, input);
myCaptureQueriesListener.logSelectQueriesForCurrentThread();
assertEquals(0, myCaptureQueriesListener.countSelectQueriesForCurrentThread());
assertEquals(45, myCaptureQueriesListener.countInsertQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countUpdateQueriesForCurrentThread());
assertEquals(0, myCaptureQueriesListener.countDeleteQueriesForCurrentThread());
assertEquals(1, myCaptureQueriesListener.countCommits());
assertEquals(0, myCaptureQueriesListener.countRollbacks());
assertEquals(input.getEntry().size(), output.getEntry().size());
runInTransaction(() -> {
assertEquals(10, myResourceTableDao.count());
assertEquals(10, myResourceHistoryTableDao.count());
});
}
@ -2791,7 +2871,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
* as well as a large number of updates (PUT). This means that a lot of URLs and resources
* need to be resolved (ie SQL SELECT) in order to proceed with the transaction. Prior
* to the optimization that introduced this test, we had 140 SELECTs, now it's 17.
*
* <p>
* See the class javadoc before changing the counts in this test!
*/
@Test

View File

@ -495,16 +495,8 @@ public class FhirResourceDaoR4TagsTest extends BaseResourceProviderR4Test {
public void testInlineTags_Search_Security() {
myStorageSettings.setTagStorageMode(JpaStorageSettings.TagStorageModeEnum.INLINE);
SearchParameter searchParameter = new SearchParameter();
searchParameter.setId("SearchParameter/resource-security");
for (String next : myFhirContext.getResourceTypes().stream().sorted().collect(Collectors.toList())) {
searchParameter.addBase(next);
}
searchParameter.setStatus(Enumerations.PublicationStatus.ACTIVE);
searchParameter.setType(Enumerations.SearchParamType.TOKEN);
searchParameter.setCode("_security");
searchParameter.setName("Security");
searchParameter.setExpression("meta.security");
FhirContext fhirContext = myFhirContext;
SearchParameter searchParameter = createSecuritySearchParameter(fhirContext);
ourLog.debug("SearchParam:\n{}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(searchParameter));
mySearchParameterDao.update(searchParameter, mySrd);
mySearchParamRegistry.forceRefresh();
@ -520,6 +512,21 @@ public class FhirResourceDaoR4TagsTest extends BaseResourceProviderR4Test {
validatePatientSearchResultsForInlineTags(outcome);
}
@Nonnull
public static SearchParameter createSecuritySearchParameter(FhirContext fhirContext) {
SearchParameter searchParameter = new SearchParameter();
searchParameter.setId("SearchParameter/resource-security");
for (String next : fhirContext.getResourceTypes().stream().sorted().collect(Collectors.toList())) {
searchParameter.addBase(next);
}
searchParameter.setStatus(Enumerations.PublicationStatus.ACTIVE);
searchParameter.setType(Enumerations.SearchParamType.TOKEN);
searchParameter.setCode("_security");
searchParameter.setName("Security");
searchParameter.setExpression("meta.security");
return searchParameter;
}
private void validatePatientSearchResultsForInlineTags(Bundle outcome) {
Patient patient;
patient = (Patient) outcome.getEntry().get(0).getResource();

View File

@ -66,6 +66,7 @@ public class SyntheaPerfTest extends BaseJpaTest {
@AfterEach
public void afterEach() {
myFhirContext.getParserOptions().setAutoContainReferenceTargetsWithNoId(true);
myStorageSettings.setInlineResourceTextBelowSize(new JpaStorageSettings().getInlineResourceTextBelowSize());
}
@Disabled("Stress test")

View File

@ -8,13 +8,18 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.model.entity.ResourceHistoryTable;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.PatientReindexTestHelper;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@ -32,6 +37,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ReindexJobTest extends BaseJpaR4Test {
@ -53,6 +62,143 @@ public class ReindexJobTest extends BaseJpaR4Test {
@AfterEach
public void after() {
myInterceptorRegistry.unregisterAllAnonymousInterceptors();
myStorageSettings.setInlineResourceTextBelowSize(new JpaStorageSettings().getInlineResourceTextBelowSize());
}
@Test
public void testOptimizeStorage_CurrentVersion() {
// Setup
IIdType patientId = createPatient(withActiveTrue());
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId(patientId.toUnqualifiedVersionless());
p.setActive(true);
p.addIdentifier().setValue("" + i);
myPatientDao.update(p, mySrd);
}
for (int i = 0; i < 9; i++) {
createPatient(withActiveTrue());
}
runInTransaction(()->{
assertEquals(20, myResourceHistoryTableDao.count());
for (ResourceHistoryTable history : myResourceHistoryTableDao.findAll()) {
assertNull(history.getResourceTextVc());
assertNotNull(history.getResource());
}
});
myStorageSettings.setInlineResourceTextBelowSize(10000);
// execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(
new ReindexJobParameters()
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION)
.setReindexSearchParameters(ReindexParameters.ReindexSearchParametersEnum.NONE)
);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
myBatch2JobHelper.awaitJobCompletion(startResponse);
// validate
runInTransaction(()->{
assertEquals(20, myResourceHistoryTableDao.count());
ResourceHistoryTable history = myResourceHistoryTableDao.findAll().get(0);
if (history.getResourceId().equals(patientId.getIdPartAsLong()) && history.getVersion() < 11) {
assertNull(history.getResourceTextVc());
assertNotNull(history.getResource());
} else {
assertNotNull(history.getResourceTextVc());
assertNull(history.getResource());
}
});
Patient patient = myPatientDao.read(patientId, mySrd);
assertTrue(patient.getActive());
}
@Test
public void testOptimizeStorage_AllVersions() {
// Setup
IIdType patientId = createPatient(withActiveTrue());
for (int i = 0; i < 10; i++) {
Patient p = new Patient();
p.setId(patientId.toUnqualifiedVersionless());
p.setActive(true);
p.addIdentifier().setValue("" + i);
myPatientDao.update(p, mySrd);
}
for (int i = 0; i < 9; i++) {
createPatient(withActiveTrue());
}
runInTransaction(()->{
assertEquals(20, myResourceHistoryTableDao.count());
for (ResourceHistoryTable history : myResourceHistoryTableDao.findAll()) {
assertNull(history.getResourceTextVc());
assertNotNull(history.getResource());
}
});
myStorageSettings.setInlineResourceTextBelowSize(10000);
// execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(
new ReindexJobParameters()
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.ALL_VERSIONS)
.setReindexSearchParameters(ReindexParameters.ReindexSearchParametersEnum.NONE)
);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
myBatch2JobHelper.awaitJobCompletion(startResponse);
// validate
runInTransaction(()->{
assertEquals(20, myResourceHistoryTableDao.count());
for (ResourceHistoryTable history : myResourceHistoryTableDao.findAll()) {
assertNotNull(history.getResourceTextVc());
assertNull(history.getResource());
}
});
Patient patient = myPatientDao.read(patientId, mySrd);
assertTrue(patient.getActive());
}
@Test
public void testOptimizeStorage_DeletedRecords() {
// Setup
IIdType patientId = createPatient(withActiveTrue());
myPatientDao.delete(patientId, mySrd);
for (int i = 0; i < 9; i++) {
IIdType nextId = createPatient(withActiveTrue());
myPatientDao.delete(nextId, mySrd);
}
myStorageSettings.setInlineResourceTextBelowSize(10000);
// execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(
new ReindexJobParameters()
.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION)
.setReindexSearchParameters(ReindexParameters.ReindexSearchParametersEnum.NONE)
);
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
JobInstance outcome = myBatch2JobHelper.awaitJobCompletion(startResponse);
assertEquals(10, outcome.getCombinedRecordsProcessed());
try {
myPatientDao.read(patientId, mySrd);
fail();
} catch (ResourceGoneException e) {
// good
}
}
@Test
@ -180,12 +326,12 @@ public class ReindexJobTest extends BaseJpaR4Test {
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters());
Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(startRequest);
JobInstance outcome = myBatch2JobHelper.awaitJobFailure(startResponse);
JobInstance outcome = myBatch2JobHelper.awaitJobCompletion(startResponse);
// Verify
assertEquals(StatusEnum.ERRORED, outcome.getStatus());
assertEquals("foo message", outcome.getErrorMessage());
assertEquals(StatusEnum.COMPLETED, outcome.getStatus());
assertEquals(null, outcome.getErrorMessage());
}
@Test

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
@ -22,6 +23,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED;
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -57,7 +60,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(2, outcome.getRecordsProcessed());
@ -87,7 +90,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(2, outcome.getRecordsProcessed());
@ -120,7 +123,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(2, outcome.getRecordsProcessed());
@ -188,7 +191,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(2, outcome.getRecordsProcessed());
@ -233,7 +236,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id", new ReindexJobParameters());
// Verify
assertEquals(4, outcome.getRecordsProcessed());
@ -247,7 +250,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
verify(myDataSink, times(1)).recoveredError(myErrorCaptor.capture());
String message = myErrorCaptor.getValue();
message = message.replace("Observation.subject.where(resolve() is Patient)", "Observation.subject"); // depending on whether subject or patient gets indexed first
assertEquals("Failure reindexing Patient/" + idPatientToInvalidate + ": HAPI-0928: Failed to parse database resource[Patient/" + idPatientToInvalidate + " (pid " + idPatientToInvalidate + ", version R4): HAPI-1861: Failed to parse JSON encoded FHIR content: HAPI-1859: Content does not appear to be FHIR JSON, first non-whitespace character was: 'A' (must be '{')", message);
assertThat(message, containsString("HAPI-0928: Failed to parse database resource"));
runInTransaction(() -> {
ResourceTable table = myResourceTableDao.findById(idPatientToInvalidate).orElseThrow();

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.search.reindex;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
@ -14,6 +15,7 @@ import ca.uhn.test.concurrency.LockstepEnumPhaser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.SearchParameter;
import org.junit.jupiter.api.Test;
@ -91,8 +93,14 @@ class ReindexRaceBugTest extends BaseJpaR4Test {
return result;
});
assertEquals(1, getSPIDXDateCount(observationPid), "still only one index row before reindex");
ReindexParameters reindexParameters = new ReindexParameters();
reindexParameters.setReindexSearchParameters(ReindexParameters.ReindexSearchParametersEnum.ALL);
reindexParameters.setOptimisticLock(true);
reindexParameters.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.NONE);
// suppose reindex job step starts here and loads the resource and ResourceTable entity
ExecutorService backgroundReindexThread = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder().namingPattern("Reindex-thread-%d").build());
Future<Integer> backgroundResult = backgroundReindexThread.submit(() -> {
@ -104,7 +112,7 @@ class ReindexRaceBugTest extends BaseJpaR4Test {
phaser.assertInPhase(Steps.RUN_REINDEX);
ourLog.info("Run $reindex");
myObservationDao.reindex(JpaPid.fromIdAndResourceType(observationPid, "Observation"), rd, new TransactionDetails());
myObservationDao.reindex(JpaPid.fromIdAndResourceType(observationPid, "Observation"), reindexParameters, rd, new TransactionDetails());
ourLog.info("$reindex done release main thread to delete");
phaser.arriveAndAwaitSharedEndOf(Steps.RUN_REINDEX);

View File

@ -345,7 +345,7 @@ public class GiantTransactionPerfTest {
}
}
private class MockResourceHistoryTableDao implements IResourceHistoryTableDao {
private static class MockResourceHistoryTableDao implements IResourceHistoryTableDao {
private int mySaveCount;
@Override
@ -363,6 +363,11 @@ public class GiantTransactionPerfTest {
throw new UnsupportedOperationException();
}
@Override
public Slice<ResourceHistoryTable> findForResourceIdAndReturnEntities(Pageable thePage, Long theId, Long theDontWantVersion) {
throw new UnsupportedOperationException();
}
@Override
public Slice<Long> findIdsOfPreviousVersionsOfResourceId(Pageable thePage, Long theResourceId) {
throw new UnsupportedOperationException();
@ -378,6 +383,11 @@ public class GiantTransactionPerfTest {
throw new UnsupportedOperationException();
}
@Override
public void setResourceTextVcForVersion(Long id, String resourceText) {
throw new UnsupportedOperationException();
}
@Override
public void updateVersion(long theId, long theOldVersion, long theNewVersion) {
throw new UnsupportedOperationException();
@ -388,23 +398,27 @@ public class GiantTransactionPerfTest {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public List<ResourceHistoryTable> findAll() {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public List<ResourceHistoryTable> findAll(Sort sort) {
public List<ResourceHistoryTable> findAll(@Nonnull Sort sort) {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public Page<ResourceHistoryTable> findAll(Pageable pageable) {
public Page<ResourceHistoryTable> findAll(@Nonnull Pageable pageable) {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public List<ResourceHistoryTable> findAllById(Iterable<Long> ids) {
public List<ResourceHistoryTable> findAllById(@Nonnull Iterable<Long> ids) {
throw new UnsupportedOperationException();
}
@ -414,22 +428,22 @@ public class GiantTransactionPerfTest {
}
@Override
public void deleteById(Long theLong) {
public void deleteById(@Nonnull Long theLong) {
throw new UnsupportedOperationException();
}
@Override
public void delete(ResourceHistoryTable entity) {
public void delete(@Nonnull ResourceHistoryTable entity) {
throw new UnsupportedOperationException();
}
@Override
public void deleteAllById(Iterable<? extends Long> ids) {
public void deleteAllById(@Nonnull Iterable<? extends Long> ids) {
throw new UnsupportedOperationException();
}
@Override
public void deleteAll(Iterable<? extends ResourceHistoryTable> entities) {
public void deleteAll(@Nonnull Iterable<? extends ResourceHistoryTable> entities) {
throw new UnsupportedOperationException();
}
@ -438,24 +452,27 @@ public class GiantTransactionPerfTest {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public <S extends ResourceHistoryTable> S save(S entity) {
public <S extends ResourceHistoryTable> S save(@Nonnull S entity) {
mySaveCount++;
return entity;
}
@Nonnull
@Override
public <S extends ResourceHistoryTable> List<S> saveAll(Iterable<S> entities) {
public <S extends ResourceHistoryTable> List<S> saveAll(@Nonnull Iterable<S> entities) {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public Optional<ResourceHistoryTable> findById(@Nonnull Long theLong) {
throw new UnsupportedOperationException();
}
@Override
public Optional<ResourceHistoryTable> findById(Long theLong) {
throw new UnsupportedOperationException();
}
@Override
public boolean existsById(Long theLong) {
public boolean existsById(@Nonnull Long theLong) {
throw new UnsupportedOperationException();
}
@ -464,28 +481,30 @@ public class GiantTransactionPerfTest {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public <S extends ResourceHistoryTable> S saveAndFlush(S entity) {
public <S extends ResourceHistoryTable> S saveAndFlush(@Nonnull S entity) {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public <S extends ResourceHistoryTable> List<S> saveAllAndFlush(@Nonnull Iterable<S> entities) {
throw new UnsupportedOperationException();
}
@Override
public <S extends ResourceHistoryTable> List<S> saveAllAndFlush(Iterable<S> entities) {
public void deleteInBatch(@Nonnull Iterable<ResourceHistoryTable> entities) {
throw new UnsupportedOperationException();
}
@Override
public void deleteInBatch(Iterable<ResourceHistoryTable> entities) {
public void deleteAllInBatch(@Nonnull Iterable<ResourceHistoryTable> entities) {
throw new UnsupportedOperationException();
}
@Override
public void deleteAllInBatch(Iterable<ResourceHistoryTable> entities) {
throw new UnsupportedOperationException();
}
@Override
public void deleteAllByIdInBatch(Iterable<Long> ids) {
public void deleteAllByIdInBatch(@Nonnull Iterable<Long> ids) {
throw new UnsupportedOperationException();
}
@ -495,57 +514,62 @@ public class GiantTransactionPerfTest {
}
@Override
public ResourceHistoryTable getOne(Long theLong) {
public ResourceHistoryTable getOne(@Nonnull Long theLong) {
throw new UnsupportedOperationException();
}
@Override
public ResourceHistoryTable getById(Long theLong) {
public ResourceHistoryTable getById(@Nonnull Long theLong) {
throw new UnsupportedOperationException();
}
@Override
public ResourceHistoryTable getReferenceById(Long theLong) {
public ResourceHistoryTable getReferenceById(@Nonnull Long theLong) {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public <S extends ResourceHistoryTable> Optional<S> findOne(Example<S> example) {
public <S extends ResourceHistoryTable> Optional<S> findOne(@Nonnull Example<S> example) {
return Optional.empty();
}
@Nonnull
@Override
public <S extends ResourceHistoryTable> List<S> findAll(Example<S> example) {
public <S extends ResourceHistoryTable> List<S> findAll(@Nonnull Example<S> example) {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public <S extends ResourceHistoryTable> List<S> findAll(@Nonnull Example<S> example, @Nonnull Sort sort) {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public <S extends ResourceHistoryTable> Page<S> findAll(@Nonnull Example<S> example, @Nonnull Pageable pageable) {
throw new UnsupportedOperationException();
}
@Override
public <S extends ResourceHistoryTable> List<S> findAll(Example<S> example, Sort sort) {
public <S extends ResourceHistoryTable> long count(@Nonnull Example<S> example) {
throw new UnsupportedOperationException();
}
@Override
public <S extends ResourceHistoryTable> Page<S> findAll(Example<S> example, Pageable pageable) {
public <S extends ResourceHistoryTable> boolean exists(@Nonnull Example<S> example) {
throw new UnsupportedOperationException();
}
@Nonnull
@Override
public <S extends ResourceHistoryTable> long count(Example<S> example) {
throw new UnsupportedOperationException();
}
@Override
public <S extends ResourceHistoryTable> boolean exists(Example<S> example) {
throw new UnsupportedOperationException();
}
@Override
public <S extends ResourceHistoryTable, R> R findBy(Example<S> example, Function<FluentQuery.FetchableFluentQuery<S>, R> queryFunction) {
public <S extends ResourceHistoryTable, R> R findBy(@Nonnull Example<S> example, @Nonnull Function<FluentQuery.FetchableFluentQuery<S>, R> queryFunction) {
throw new UnsupportedOperationException();
}
}
private class MockEntityManager implements EntityManager {
private static class MockEntityManager implements EntityManager {
private final List<Object> myPersistCount = new ArrayList<>();
private final List<Object> myMergeCount = new ArrayList<>();
private long ourNextId = 0L;
@ -819,54 +843,6 @@ public class GiantTransactionPerfTest {
}
}
private static class MockSchedulerSvc implements ISchedulerService {
@Override
public void purgeAllScheduledJobsForUnitTest() {
throw new UnsupportedOperationException();
}
@Override
public void logStatusForUnitTest() {
throw new UnsupportedOperationException();
}
@Override
public void scheduleLocalJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
throw new UnsupportedOperationException();
}
@Override
public void scheduleClusteredJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition) {
throw new UnsupportedOperationException();
}
@Override
public Set<JobKey> getLocalJobKeysForUnitTest() {
throw new UnsupportedOperationException();
}
@Override
public Set<JobKey> getClusteredJobKeysForUnitTest() {
throw new UnsupportedOperationException();
}
@Override
public boolean isStopping() {
return false;
}
@Override
public void triggerLocalJobImmediately(ScheduledJobDefinition theJobDefinition) {
ISchedulerService.super.triggerLocalJobImmediately(theJobDefinition);
}
@Override
public void triggerClusteredJobImmediately(ScheduledJobDefinition theJobDefinition) {
ISchedulerService.super.triggerClusteredJobImmediately(theJobDefinition);
}
}
private static class MockServletRequest extends MockHttpServletRequest {
}

View File

@ -152,7 +152,7 @@ public abstract class BaseColumnCalculatorTask extends BaseTableColumnTask {
}
};
myExecutor = new ThreadPoolExecutor(
1,
maximumPoolSize,
maximumPoolSize,
0L,
TimeUnit.MILLISECONDS,

View File

@ -58,11 +58,13 @@ public class FetchResourceIdsStep implements IFirstJobStepWorker<BulkExportJobPa
public RunOutcome run(@Nonnull StepExecutionDetails<BulkExportJobParameters, VoidModel> theStepExecutionDetails,
@Nonnull IJobDataSink<ResourceIdList> theDataSink) throws JobExecutionFailedException {
BulkExportJobParameters params = theStepExecutionDetails.getParameters();
ourLog.info("Starting BatchExport job");
ourLog.info("Fetching resource IDs for bulk export job instance[{}]", theStepExecutionDetails.getInstance().getInstanceId());
ExportPIDIteratorParameters providerParams = new ExportPIDIteratorParameters();
providerParams.setInstanceId(theStepExecutionDetails.getInstance().getInstanceId());
providerParams.setChunkId(theStepExecutionDetails.getChunkId());
providerParams.setFilters(params.getFilters());
providerParams.setStartDate(params.getStartDate());
providerParams.setStartDate(params.getSince());
providerParams.setExportStyle(params.getExportStyle());
providerParams.setGroupId(params.getGroupId());
providerParams.setPatientIds(params.getPatientIds());

View File

@ -45,7 +45,7 @@ public class BulkExportJobParameters extends BulkExportJobBase {
@JsonSerialize(using = JsonDateSerializer.class)
@JsonDeserialize(using = JsonDateDeserializer.class)
@JsonProperty("since")
private Date myStartDate;
private Date mySince;
@JsonProperty("exportId")
private String myExportId;
@ -90,7 +90,7 @@ public class BulkExportJobParameters extends BulkExportJobBase {
params.setPostFetchFilterUrls(theParameters.getPostFetchFilterUrls());
params.setGroupId(theParameters.getGroupId());
params.setOutputFormat(theParameters.getOutputFormat());
params.setStartDate(theParameters.getStartDate());
params.setSince(theParameters.getSince());
params.setExpandMdm(theParameters.isExpandMdm());
params.setPatientIds(theParameters.getPatientIds());
params.setOriginalRequestUrl(theParameters.getOriginalRequestUrl());
@ -114,12 +114,12 @@ public class BulkExportJobParameters extends BulkExportJobBase {
myResourceTypes = theResourceTypes;
}
public Date getStartDate() {
return myStartDate;
public Date getSince() {
return mySince;
}
public void setStartDate(Date theStartDate) {
myStartDate = theStartDate;
public void setSince(Date theSince) {
mySince = theSince;
}
public List<String> getFilters() {

View File

@ -137,7 +137,7 @@ public class ConsumeFilesStep implements ILastJobStepWorker<BulkImportJobParamet
theTransactionDetails.addResolvedResourceId(next, null);
}
mySystemDao.preFetchResources(resolvedIds);
mySystemDao.preFetchResources(resolvedIds, true);
for (IBaseResource next : theResources) {
updateResource(theRequestDetails, theTransactionDetails, next);

View File

@ -68,7 +68,7 @@ public class ReindexAppCtx {
@Bean
public GenerateRangeChunksStep reindexGenerateRangeChunksStep() {
return new GenerateRangeChunksStep();
return new ReindexGenerateRangeChunksStep();
}
@Bean

View File

@ -0,0 +1,46 @@
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.PartitionedUrlChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
public class ReindexGenerateRangeChunksStep extends GenerateRangeChunksStep<ReindexJobParameters> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexGenerateRangeChunksStep.class);
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, VoidModel> theStepExecutionDetails, @Nonnull IJobDataSink<PartitionedUrlChunkRangeJson> theDataSink) throws JobExecutionFailedException {
ReindexJobParameters parameters = theStepExecutionDetails.getParameters();
ourLog.info("Beginning reindex job - OptimizeStorage[{}] - ReindexSearchParameters[{}]", parameters.getOptimizeStorage(), parameters.getReindexSearchParameters());
return super.run(theStepExecutionDetails, theDataSink);
}
}

View File

@ -20,7 +20,56 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlListJobParameters;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ReindexJobParameters extends PartitionedUrlListJobParameters {
public static final String OPTIMIZE_STORAGE = "optimizeStorage";
public static final String REINDEX_SEARCH_PARAMETERS = "reindexSearchParameters";
public static final String OPTIMISTIC_LOCK = "optimisticLock";
@JsonProperty(value = OPTIMIZE_STORAGE, defaultValue = ReindexParameters.OPTIMIZE_STORAGE_DEFAULT_STRING, required = false)
@Nullable
private ReindexParameters.OptimizeStorageModeEnum myOptimizeStorage;
@JsonProperty(value = REINDEX_SEARCH_PARAMETERS, defaultValue = ReindexParameters.REINDEX_SEARCH_PARAMETERS_DEFAULT_STRING, required = false)
@Nullable
private ReindexParameters.ReindexSearchParametersEnum myReindexSearchParameters;
@JsonProperty(value = OPTIMISTIC_LOCK, defaultValue = ReindexParameters.OPTIMISTIC_LOCK_DEFAULT + "", required = false)
@Nullable
private Boolean myOptimisticLock;
public boolean getOptimisticLock() {
return defaultIfNull(myOptimisticLock, ReindexParameters.OPTIMISTIC_LOCK_DEFAULT);
}
public ReindexJobParameters setOptimisticLock(boolean theOptimisticLock) {
myOptimisticLock = theOptimisticLock;
return this;
}
public ReindexParameters.OptimizeStorageModeEnum getOptimizeStorage() {
return defaultIfNull(myOptimizeStorage, ReindexParameters.OPTIMIZE_STORAGE_DEFAULT);
}
public ReindexJobParameters setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum myOptimizeStorage) {
this.myOptimizeStorage = myOptimizeStorage;
return this;
}
public ReindexParameters.ReindexSearchParametersEnum getReindexSearchParameters() {
return defaultIfNull(myReindexSearchParameters, ReindexParameters.REINDEX_SEARCH_PARAMETERS_DEFAULT);
}
public ReindexJobParameters setReindexSearchParameters(ReindexParameters.ReindexSearchParametersEnum theReindexSearchParameters) {
this.myReindexSearchParameters = theReindexSearchParameters;
return this;
}
}

View File

@ -25,19 +25,28 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.model.api.annotation.Description;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.UrlUtil;
import ca.uhn.fhir.util.ValidateUtil;
import com.google.common.base.Ascii;
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import java.util.List;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.OPTIMIZE_STORAGE;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.REINDEX_SEARCH_PARAMETERS;
public class ReindexProvider {
private final FhirContext myFhirContext;
@ -57,11 +66,43 @@ public class ReindexProvider {
@Operation(name = ProviderConstants.OPERATION_REINDEX, idempotent = false)
public IBaseParameters reindex(
@OperationParam(name = ProviderConstants.OPERATION_REINDEX_PARAM_URL, typeName = "string", min = 0, max = OperationParam.MAX_UNLIMITED) List<IPrimitiveType<String>> theUrlsToReindex,
@Description("Optionally provides one ore more relative search parameter URLs (e.g. \"Patient?active=true\" or \"Observation?\") that will be reindexed. Note that the URL applies to the resources as they are currently indexed, so you should not use a search parameter that needs reindexing in the URL or some resources may be missed. If no URLs are provided, all resources of all types will be reindexed.")
@OperationParam(name = ProviderConstants.OPERATION_REINDEX_PARAM_URL, typeName = "string", min = 0, max = OperationParam.MAX_UNLIMITED)
List<IPrimitiveType<String>> theUrlsToReindex,
@Description("Should search parameters be reindexed (default: " + ReindexParameters.REINDEX_SEARCH_PARAMETERS_DEFAULT_STRING + ")")
@OperationParam(name = REINDEX_SEARCH_PARAMETERS, typeName = "code", min = 0, max = 1)
IPrimitiveType<String> theReindexSearchParameters,
@Description("Should we attempt to optimize storage for resources (default: " + ReindexParameters.OPTIMIZE_STORAGE_DEFAULT_STRING + ")")
@OperationParam(name = OPTIMIZE_STORAGE, typeName = "code", min = 0, max = 1)
IPrimitiveType<String> theOptimizeStorage,
@Description("Should we attempt to optimistically lock resources being reindexed in order to avoid concurrency issues (default: " + ReindexParameters.OPTIMISTIC_LOCK_DEFAULT + ")")
@OperationParam(name = ReindexJobParameters.OPTIMISTIC_LOCK, typeName = "boolean", min = 0, max = 1)
IPrimitiveType<Boolean> theOptimisticLock,
RequestDetails theRequestDetails
) {
ReindexJobParameters params = new ReindexJobParameters();
if (theReindexSearchParameters != null) {
String value = theReindexSearchParameters.getValue();
if (value != null) {
value = Ascii.toUpperCase(value);
ValidateUtil.isTrueOrThrowInvalidRequest(EnumUtils.isValidEnum(ReindexParameters.ReindexSearchParametersEnum.class, value), "Invalid " + REINDEX_SEARCH_PARAMETERS + " value: " + UrlUtil.sanitizeUrlPart(theReindexSearchParameters.getValue()));
params.setReindexSearchParameters(ReindexParameters.ReindexSearchParametersEnum.valueOf(value));
}
}
if (theOptimizeStorage != null) {
String value = theOptimizeStorage.getValue();
if (value != null) {
value = Ascii.toUpperCase(value);
ValidateUtil.isTrueOrThrowInvalidRequest(EnumUtils.isValidEnum(ReindexParameters.OptimizeStorageModeEnum.class, value), "Invalid " + OPTIMIZE_STORAGE + " value: " + UrlUtil.sanitizeUrlPart(theOptimizeStorage.getValue()));
params.setOptimizeStorage(ReindexParameters.OptimizeStorageModeEnum.valueOf(value));
}
}
if (theOptimisticLock != null && theOptimisticLock.getValue() != null) {
params.setOptimisticLock(theOptimisticLock.getValue());
}
if (theUrlsToReindex != null) {
theUrlsToReindex.stream()
.map(IPrimitiveType::getValue)

View File

@ -19,21 +19,14 @@
*/
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.api.*;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.dao.*;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
@ -67,17 +60,23 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails, @Nonnull IJobDataSink<VoidModel> theDataSink) throws JobExecutionFailedException {
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
ReindexJobParameters jobParameters = theStepExecutionDetails.getParameters();
return doReindex(data, theDataSink, theStepExecutionDetails.getInstance().getInstanceId(), theStepExecutionDetails.getChunkId());
return doReindex(data, theDataSink, theStepExecutionDetails.getInstance().getInstanceId(), theStepExecutionDetails.getChunkId(), jobParameters);
}
@Nonnull
public RunOutcome doReindex(ResourceIdListWorkChunkJson data, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId) {
public RunOutcome doReindex(ResourceIdListWorkChunkJson data, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId, ReindexJobParameters theJobParameters) {
RequestDetails requestDetails = new SystemRequestDetails();
requestDetails.setRetry(true);
requestDetails.setMaxRetries(REINDEX_MAX_RETRIES);
TransactionDetails transactionDetails = new TransactionDetails();
myHapiTransactionService.execute(requestDetails, transactionDetails, new ReindexJob(data, requestDetails, transactionDetails, theDataSink, theInstanceId, theChunkId));
ReindexJob reindexJob = new ReindexJob(data, requestDetails, transactionDetails, theDataSink, theInstanceId, theChunkId, theJobParameters);
myHapiTransactionService
.withRequest(requestDetails)
.withTransactionDetails(transactionDetails)
.execute(reindexJob);
return new RunOutcome(data.size());
}
@ -89,14 +88,16 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
private final IJobDataSink<VoidModel> myDataSink;
private final String myChunkId;
private final String myInstanceId;
private final ReindexJobParameters myJobParameters;
public ReindexJob(ResourceIdListWorkChunkJson theData, RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId) {
public ReindexJob(ResourceIdListWorkChunkJson theData, RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId, ReindexJobParameters theJobParameters) {
myData = theData;
myRequestDetails = theRequestDetails;
myTransactionDetails = theTransactionDetails;
myDataSink = theDataSink;
myInstanceId = theInstanceId;
myChunkId = theChunkId;
myJobParameters = theJobParameters;
}
@Override
@ -109,9 +110,15 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
// Prefetch Resources from DB
mySystemDao.preFetchResources(persistentIds);
boolean reindexSearchParameters = myJobParameters.getReindexSearchParameters() != ReindexParameters.ReindexSearchParametersEnum.NONE;
mySystemDao.preFetchResources(persistentIds, reindexSearchParameters);
ourLog.info("Prefetched {} resources in {} - Instance[{}] Chunk[{}]", persistentIds.size(), sw, myInstanceId, myChunkId);
ReindexParameters parameters = new ReindexParameters()
.setReindexSearchParameters(myJobParameters.getReindexSearchParameters())
.setOptimizeStorage(myJobParameters.getOptimizeStorage())
.setOptimisticLock(myJobParameters.getOptimisticLock());
// Reindex
sw.restart();
@ -121,7 +128,10 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Resourc
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(nextResourceType);
IResourcePersistentId<?> resourcePersistentId = persistentIds.get(i);
try {
dao.reindex(resourcePersistentId, myRequestDetails, myTransactionDetails);
ReindexOutcome outcome = dao.reindex(resourcePersistentId, parameters, myRequestDetails, myTransactionDetails);
outcome.getWarnings().forEach(myDataSink::recoveredError);
} catch (BaseServerResponseException | DataFormatException e) {
String resourceForcedId = myIdHelperService.translatePidIdToForcedIdWithCache(resourcePersistentId).orElse(resourcePersistentId.toString());
String resourceId = nextResourceType + "/" + resourceForcedId;

View File

@ -84,7 +84,7 @@ public class ExpandResourcesStepTest {
parameters.setResourceTypes(Arrays.asList("Patient", "Observation"));
parameters.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
parameters.setOutputFormat("json");
parameters.setStartDate(new Date());
parameters.setSince(new Date());
if (thePartitioned) {
parameters.setPartitionId(RequestPartitionId.fromPartitionName("Partition-A"));
}

View File

@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -45,6 +46,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.hamcrest.Matchers.containsString;
@ExtendWith(MockitoExtension.class)
public class FetchResourceIdsStepTest {
@ -73,7 +75,7 @@ public class FetchResourceIdsStepTest {
private BulkExportJobParameters createParameters(boolean thePartitioned) {
BulkExportJobParameters jobParameters = new BulkExportJobParameters();
jobParameters.setStartDate(new Date());
jobParameters.setSince(new Date());
jobParameters.setOutputFormat("json");
jobParameters.setExportStyle(BulkDataExportOptions.ExportStyle.PATIENT);
jobParameters.setResourceTypes(Arrays.asList("Patient", "Observation"));
@ -159,10 +161,10 @@ public class FetchResourceIdsStepTest {
ArgumentCaptor<ILoggingEvent> logCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
verify(myAppender, atLeastOnce()).doAppend(logCaptor.capture());
List<ILoggingEvent> events = logCaptor.getAllValues();
assertTrue(events.get(0).getMessage().contains("Starting BatchExport job"));
assertTrue(events.get(1).getMessage().contains("Running FetchResource"));
assertTrue(events.get(2).getMessage().contains("Running FetchResource"));
assertTrue(events.get(3).getFormattedMessage().contains("Submitted "
assertThat(events.get(0).getMessage(), containsString("Fetching resource IDs for bulk export job instance"));
assertThat(events.get(1).getMessage(), containsString("Running FetchResource"));
assertThat(events.get(2).getMessage(), containsString("Running FetchResource"));
assertThat(events.get(3).getFormattedMessage(), containsString("Submitted "
+ parameters.getResourceTypes().size()
+ " groups of ids for processing"
));

View File

@ -104,7 +104,7 @@ public class WriteBinaryStepTest {
JobInstance theInstance,
boolean thePartitioned) {
BulkExportJobParameters parameters = new BulkExportJobParameters();
parameters.setStartDate(new Date());
parameters.setSince(new Date());
parameters.setResourceTypes(Arrays.asList("Patient", "Observation"));
parameters.setPartitionId(getPartitionId(thePartitioned));
return new StepExecutionDetails<>(

View File

@ -6,11 +6,13 @@ import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.DecimalType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.StringType;
@ -32,6 +34,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
@ -119,15 +123,19 @@ public class ReindexProviderTest {
ReindexJobParameters params = myStartRequestCaptor.getValue().getParameters(ReindexJobParameters.class);
assertThat(params.getPartitionedUrls(), hasSize(1));
assertEquals(url, params.getPartitionedUrls().get(0).getUrl());
// Default values
assertEquals(ReindexParameters.ReindexSearchParametersEnum.ALL, params.getReindexSearchParameters());
assertTrue(params.getOptimisticLock());
assertEquals(ReindexParameters.OptimizeStorageModeEnum.NONE, params.getOptimizeStorage());
}
@Test
public void testReindex_NoUrl() {
// setup
Parameters input = new Parameters();
int batchSize = 2401;
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_BATCH_SIZE, new DecimalType(batchSize));
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_EVERYTHING, new BooleanType(true));
input.addParameter(ReindexJobParameters.REINDEX_SEARCH_PARAMETERS, new CodeType("none"));
input.addParameter(ReindexJobParameters.OPTIMISTIC_LOCK, new BooleanType(false));
input.addParameter(ReindexJobParameters.OPTIMIZE_STORAGE, new CodeType("current_version"));
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
@ -138,7 +146,7 @@ public class ReindexProviderTest {
.operation()
.onServer()
.named(ProviderConstants.OPERATION_REINDEX)
.withNoParameters(Parameters.class)
.withParameters(input)
.execute();
// Verify
@ -150,6 +158,10 @@ public class ReindexProviderTest {
verify(myJobCoordinator, times(1)).startInstance(myStartRequestCaptor.capture());
ReindexJobParameters params = myStartRequestCaptor.getValue().getParameters(ReindexJobParameters.class);
assertThat(params.getPartitionedUrls(), empty());
// Non-default values
assertEquals(ReindexParameters.ReindexSearchParametersEnum.NONE, params.getReindexSearchParameters());
assertFalse(params.getOptimisticLock());
assertEquals(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION, params.getOptimizeStorage());
}
}

View File

@ -238,6 +238,14 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
*/
void reindex(T theResource, IBasePersistedResource theEntity);
/**
* Reindex the given resource
*
* @param theResourcePersistentId The ID
* @return
*/
ReindexOutcome reindex(IResourcePersistentId theResourcePersistentId, ReindexParameters theReindexParameters, RequestDetails theRequest, TransactionDetails theTransactionDetails);
void removeTag(IIdType theId, TagTypeEnum theTagType, String theSystem, String theCode, RequestDetails theRequestDetails);
void removeTag(IIdType theId, TagTypeEnum theTagType, String theSystem, String theCode);
@ -346,10 +354,4 @@ public interface IFhirResourceDao<T extends IBaseResource> extends IDao {
return read(theReferenceElement.toVersionless()).getIdElement().getVersionIdPart();
}
/**
* Reindex the given resource
*
* @param theResourcePersistentId The ID
*/
void reindex(IResourcePersistentId theResourcePersistentId, RequestDetails theRequest, TransactionDetails theTransactionDetails);
}

View File

@ -88,8 +88,10 @@ public interface IFhirSystemDao<T, MT> extends IDao {
/**
* Preload resources from the database in batch. This method is purely
* a performance optimization and must be purely idempotent.
*
* @param thePreFetchIndexes Should resource indexes be loaded
*/
default <P extends IResourcePersistentId> void preFetchResources(List<P> theResolvedIds) {
default <P extends IResourcePersistentId> void preFetchResources(List<P> theResolvedIds, boolean thePreFetchIndexes) {
// nothing by default
}
}

View File

@ -0,0 +1,43 @@
/*-
* #%L
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.api.dao;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ReindexOutcome {
private List<String> myWarnings;
public List<String> getWarnings() {
return defaultIfNull(myWarnings, Collections.emptyList());
}
public void addWarning(String theWarning) {
if (myWarnings == null) {
myWarnings = new ArrayList<>();
}
myWarnings.add(theWarning);
}
}

View File

@ -0,0 +1,71 @@
/*-
* #%L
* HAPI FHIR Storage api
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.api.dao;
public class ReindexParameters {
public static final ReindexSearchParametersEnum REINDEX_SEARCH_PARAMETERS_DEFAULT = ReindexSearchParametersEnum.ALL;
public static final String REINDEX_SEARCH_PARAMETERS_DEFAULT_STRING = "ALL";
public static final boolean OPTIMISTIC_LOCK_DEFAULT = true;
public static final OptimizeStorageModeEnum OPTIMIZE_STORAGE_DEFAULT = OptimizeStorageModeEnum.NONE;
public static final String OPTIMIZE_STORAGE_DEFAULT_STRING = "NONE";
private ReindexSearchParametersEnum myReindexSearchParameters = REINDEX_SEARCH_PARAMETERS_DEFAULT;
private OptimizeStorageModeEnum myOptimizeStorage = OPTIMIZE_STORAGE_DEFAULT;
private boolean myOptimisticLock = OPTIMISTIC_LOCK_DEFAULT;
public boolean isOptimisticLock() {
return myOptimisticLock;
}
public ReindexParameters setOptimisticLock(boolean theOptimisticLock) {
myOptimisticLock = theOptimisticLock;
return this;
}
public ReindexSearchParametersEnum getReindexSearchParameters() {
return myReindexSearchParameters;
}
public ReindexParameters setReindexSearchParameters(ReindexSearchParametersEnum theReindexSearchParameters) {
myReindexSearchParameters = theReindexSearchParameters;
return this;
}
public OptimizeStorageModeEnum getOptimizeStorage() {
return myOptimizeStorage;
}
public ReindexParameters setOptimizeStorage(OptimizeStorageModeEnum theOptimizeStorage) {
myOptimizeStorage = theOptimizeStorage;
return this;
}
public enum ReindexSearchParametersEnum {
ALL,
NONE
}
public enum OptimizeStorageModeEnum {
NONE,
CURRENT_VERSION,
ALL_VERSIONS
}
}

View File

@ -38,7 +38,7 @@ public class BulkExportParameters extends Batch2BaseJobParameters {
/**
* The earliest date from which to export resources.
*/
private Date myStartDate;
private Date mySince;
/**
* Filters are used to narrow down the resources to export.
@ -121,12 +121,12 @@ public class BulkExportParameters extends Batch2BaseJobParameters {
myResourceTypes = theResourceTypes;
}
public Date getStartDate() {
return myStartDate;
public Date getSince() {
return mySince;
}
public void setStartDate(Date theStartDate) {
myStartDate = theStartDate;
public void setSince(Date theSince) {
mySince = theSince;
}
public List<String> getFilters() {

View File

@ -51,33 +51,37 @@ public class ExportPIDIteratorParameters {
* (Batch jobs are stored in Persistence, to keep track
* of results/status).
*/
private String myJobId;
private String myInstanceId;
private String myChunkId;
/**
* The export style
*/
private BulkDataExportOptions.ExportStyle myExportStyle;
/**
* the group id
*/
private String myGroupId;
/**
* For group export - whether or not to expand mdm
*/
private boolean myExpandMdm;
/**
* The patient id
*/
private List<String> myPatientIds;
/**
* The partition id
*/
private RequestPartitionId myPartitionId;
public String getChunkId() {
return myChunkId;
}
public void setChunkId(String theChunkId) {
myChunkId = theChunkId;
}
public String getResourceType() {
return myResourceType;
}
@ -102,12 +106,12 @@ public class ExportPIDIteratorParameters {
myFilters = theFilters;
}
public String getJobId() {
return myJobId;
public String getInstanceId() {
return myInstanceId;
}
public void setJobId(String theJobId) {
myJobId = theJobId;
public void setInstanceId(String theInstanceId) {
myInstanceId = theInstanceId;
}
public BulkDataExportOptions.ExportStyle getExportStyle() {

View File

@ -378,6 +378,7 @@ public class BulkDataExportProvider {
response.getWriter().close();
break;
default:
// Deliberate fall through
ourLog.warn("Unrecognized status encountered: {}. Treating as BUILDING/SUBMITTED", info.getStatus().name());
//noinspection fallthrough
case BUILDING:

View File

@ -37,7 +37,7 @@ public class BulkExportUtils {
public static BulkExportParameters createBulkExportJobParametersFromExportOptions(BulkDataExportOptions theOptions) {
BulkExportParameters parameters = new BulkExportParameters(Batch2JobDefinitionConstants.BULK_EXPORT);
parameters.setStartDate(theOptions.getSince());
parameters.setSince(theOptions.getSince());
parameters.setOutputFormat(theOptions.getOutputFormat());
parameters.setExportStyle(theOptions.getExportStyle());
parameters.setFilters(new ArrayList<>(theOptions.getFilters()));

View File

@ -43,8 +43,7 @@ public final class ThreadPoolUtil {
@Nonnull
public static ThreadPoolTaskExecutor newThreadPool(int theCorePoolSize, int theMaxPoolSize, String theThreadNamePrefix, int theQueueCapacity, TaskDecorator taskDecorator) {
assert theCorePoolSize == theMaxPoolSize || theQueueCapacity == 0 : "If the queue capacity is greater than 0, core pool size needs to match max pool size or the system won't grow the queue";
Validate.isTrue(theCorePoolSize == theMaxPoolSize || theQueueCapacity == 0, "If the queue capacity is greater than 0, core pool size needs to match max pool size or the system won't grow the queue");
Validate.isTrue(theThreadNamePrefix.endsWith("-"), "Thread pool prefix name must end with a hyphen");
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setCorePoolSize(theCorePoolSize);