3755 use batch2 job after search parameter change (#3806)

* broke some tests

* invoke batch 2 reindex

* changelog

* fix compile error after merge

* fix failing test

* hook up partition support

* fix some failing tests

* deprecate the old reindex service

* fix more failing tests

* fix broken test
This commit is contained in:
JasonRoberts-smile 2022-07-21 15:26:15 -04:00 committed by GitHub
parent 85384feac7
commit f5697e13c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 206 additions and 117 deletions

View File

@ -0,0 +1,5 @@
---
type: change
issue: 3755
title: "When the Mark Resources for Reindexing after SearchParameter change configuration parameter is enabled, SMILE will
use the Batch 2 framework to perform the reindexing operation."

View File

@ -56,7 +56,7 @@ import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@Transactional
@Transactional(Transactional.TxType.REQUIRES_NEW)
public class JpaJobPersistenceImpl implements IJobPersistence {
private static final Logger ourLog = LoggerFactory.getLogger(JpaJobPersistenceImpl.class);

View File

@ -946,20 +946,22 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
/**
* Subclasses may override to provide behaviour. Called when a resource has been inserted into the database for the first time.
*
* @param theEntity The entity being updated (Do not modify the entity! Undefined behaviour will occur!)
* @param theResource The resource being persisted
* @param theEntity The entity being updated (Do not modify the entity! Undefined behaviour will occur!)
* @param theResource The resource being persisted
* @param theRequestDetails The request details, needed for partition support
*/
protected void postPersist(ResourceTable theEntity, T theResource) {
protected void postPersist(ResourceTable theEntity, T theResource, RequestDetails theRequestDetails) {
// nothing
}
/**
* Subclasses may override to provide behaviour. Called when a pre-existing resource has been updated in the database
*
* @param theEntity The resource
* @param theResource The resource being persisted
* @param theEntity The resource
* @param theResource The resource being persisted
* @param theRequestDetails The request details, needed for partition support
*/
protected void postUpdate(ResourceTable theEntity, T theResource) {
protected void postUpdate(ResourceTable theEntity, T theResource, RequestDetails theRequestDetails) {
// nothing
}
@ -1401,7 +1403,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
myEntityManager.persist(entity.getForcedId());
}
postPersist(entity, (T) theResource);
postPersist(entity, (T) theResource, theRequest);
} else if (entity.getDeleted() != null) {
entity = myEntityManager.merge(entity);
@ -1411,7 +1413,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
} else {
entity = myEntityManager.merge(entity);
postUpdate(entity, (T) theResource);
postUpdate(entity, (T) theResource, theRequest);
}
if (theCreateNewHistoryEntry) {

View File

@ -20,11 +20,17 @@ package ca.uhn.fhir.jpa.dao;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
@ -56,7 +62,6 @@ import ca.uhn.fhir.jpa.patch.JsonPatchUtils;
import ca.uhn.fhir.jpa.patch.XmlPatchUtils;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.cache.SearchCacheStatusEnum;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.ResourceSearch;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -109,7 +114,6 @@ import ca.uhn.fhir.validation.ValidationOptions;
import ca.uhn.fhir.validation.ValidationResult;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import org.apache.commons.text.WordUtils;
import org.hl7.fhir.instance.model.api.IBaseCoding;
import org.hl7.fhir.instance.model.api.IBaseMetaType;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
@ -120,7 +124,6 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
@ -147,7 +150,6 @@ import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -164,8 +166,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Autowired
private MatchResourceUrlService myMatchResourceUrlService;
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
@Autowired
private SearchBuilderFactory mySearchBuilderFactory;
@Autowired
private DaoRegistry myDaoRegistry;
@ -175,6 +175,8 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
private MatchUrlService myMatchUrlService;
@Autowired
private IDeleteExpungeJobSubmitter myDeleteExpungeJobSubmitter;
@Autowired
private IJobCoordinator myJobCoordinator;
private IInstanceValidatorModule myInstanceValidator;
private String myResourceName;
@ -184,6 +186,9 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
private MemoryCacheService myMemoryCacheService;
private TransactionTemplate myTxTemplate;
@Autowired
private UrlPartitioner myUrlPartitioner;
@Override
public DaoMethodOutcome create(final T theResource) {
return create(theResource, null, true, new TransactionDetails(), null);
@ -575,7 +580,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
myDeleteConflictService.validateOkToDelete(theDeleteConflicts, entity, false, theRequestDetails, theTransactionDetails);
preDelete(resourceToDelete, entity);
preDelete(resourceToDelete, entity, theRequestDetails);
// Notify interceptors
if (theRequestDetails != null) {
@ -969,7 +974,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return pagingProvider != null;
}
protected void markResourcesMatchingExpressionAsNeedingReindexing(Boolean theCurrentlyReindexing, String theExpression) {
protected void requestReindexForRelatedResources(Boolean theCurrentlyReindexing, List<String> theBase, RequestDetails theRequestDetails) {
// Avoid endless loops
if (Boolean.TRUE.equals(theCurrentlyReindexing)) {
return;
@ -977,26 +982,24 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
if (getConfig().isMarkResourcesForReindexingUponSearchParameterChange()) {
String expression = defaultString(theExpression);
ReindexJobParameters params = new ReindexJobParameters();
Set<String> typesToMark = myDaoRegistry
.getRegisteredDaoTypes()
theBase
.stream()
.filter(t -> WordUtils.containsAllWords(expression, t))
.collect(Collectors.toSet());
.map(t -> t + "?")
.map(url -> myUrlPartitioner.partitionUrl(url, theRequestDetails))
.forEach(params::addPartitionedUrl);
for (String resourceType : typesToMark) {
ourLog.debug("Marking all resources of type {} for reindexing due to updated search parameter with path: {}", resourceType, theExpression);
ReadPartitionIdRequestDetails details= new ReadPartitionIdRequestDetails(null, RestOperationTypeEnum.EXTENDED_OPERATION_SERVER, null, null, null);
RequestPartitionId requestPartition = myRequestPartitionHelperService.determineReadPartitionForRequest(theRequestDetails, null, details);
params.setRequestPartitionId(requestPartition);
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
txTemplate.execute(t -> {
myResourceReindexingSvc.markAllResourcesForReindexing(resourceType);
return null;
});
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
request.setParameters(params);
myJobCoordinator.startInstance(request);
ourLog.debug("Marked resources of type {} for reindexing", resourceType);
}
ourLog.debug("Started reindex job with parameters {}", params);
}
@ -1197,7 +1200,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
* Subclasses may override to provide behaviour. Invoked within a delete
* transaction with the resource that is about to be deleted.
*/
protected void preDelete(T theResourceToDelete, ResourceTable theEntityToDelete) {
protected void preDelete(T theResourceToDelete, ResourceTable theEntityToDelete, RequestDetails theRequestDetails) {
// nothing by default
}

View File

@ -26,10 +26,14 @@ import ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoSearchParameterR4;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.searchparam.extractor.ISearchParamExtractor;
import ca.uhn.fhir.model.dstu2.resource.SearchParameter;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import com.google.common.collect.Lists;
import org.hl7.fhir.convertors.advisors.impl.BaseAdvisor_10_40;
import org.hl7.fhir.convertors.factory.VersionConvertorFactory_10_40;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -39,29 +43,30 @@ public class FhirResourceDaoSearchParameterDstu2 extends BaseHapiFhirResourceDao
private ISearchParamExtractor mySearchParamExtractor;
private FhirContext myDstu2Hl7OrgContext = FhirContext.forDstu2Hl7Org();
protected void markAffectedResources(SearchParameter theResource) {
protected void reindexAffectedResources(SearchParameter theResource, RequestDetails theRequestDetails) {
Boolean reindex = theResource != null ? CURRENTLY_REINDEXING.get(theResource) : null;
String expression = theResource != null ? theResource.getXpath() : null;
markResourcesMatchingExpressionAsNeedingReindexing(reindex, expression);
List<String> base = theResource != null ? Lists.newArrayList(theResource.getBase()) : null;
requestReindexForRelatedResources(reindex, base, theRequestDetails);
}
@Override
protected void postPersist(ResourceTable theEntity, SearchParameter theResource) {
super.postPersist(theEntity, theResource);
markAffectedResources(theResource);
protected void postPersist(ResourceTable theEntity, SearchParameter theResource, RequestDetails theRequestDetails) {
super.postPersist(theEntity, theResource, theRequestDetails);
reindexAffectedResources(theResource, theRequestDetails);
}
@Override
protected void postUpdate(ResourceTable theEntity, SearchParameter theResource) {
super.postUpdate(theEntity, theResource);
markAffectedResources(theResource);
protected void postUpdate(ResourceTable theEntity, SearchParameter theResource, RequestDetails theRequestDetails) {
super.postUpdate(theEntity, theResource, theRequestDetails);
reindexAffectedResources(theResource, theRequestDetails);
}
@Override
protected void preDelete(SearchParameter theResourceToDelete, ResourceTable theEntityToDelete) {
super.preDelete(theResourceToDelete, theEntityToDelete);
markAffectedResources(theResourceToDelete);
protected void preDelete(SearchParameter theResourceToDelete, ResourceTable theEntityToDelete, RequestDetails theRequestDetails) {
super.preDelete(theResourceToDelete, theEntityToDelete, theRequestDetails);
reindexAffectedResources(theResourceToDelete, theRequestDetails);
}
@Override

View File

@ -62,8 +62,8 @@ public class FhirResourceDaoSubscriptionDstu2 extends BaseHapiFhirResourceDao<Su
@Override
protected void postPersist(ResourceTable theEntity, Subscription theSubscription) {
super.postPersist(theEntity, theSubscription);
protected void postPersist(ResourceTable theEntity, Subscription theSubscription, RequestDetails theRequestDetails) {
super.postPersist(theEntity, theSubscription, theRequestDetails);
createSubscriptionTable(theEntity, theSubscription);
}

View File

@ -53,7 +53,6 @@ import javax.transaction.Transactional;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -145,8 +144,8 @@ public class FhirResourceDaoCodeSystemDstu3 extends BaseHapiFhirResourceDao<Code
}
@Override
protected void preDelete(CodeSystem theResourceToDelete, ResourceTable theEntityToDelete) {
super.preDelete(theResourceToDelete, theEntityToDelete);
protected void preDelete(CodeSystem theResourceToDelete, ResourceTable theEntityToDelete, RequestDetails theRequestDetails) {
super.preDelete(theResourceToDelete, theEntityToDelete, theRequestDetails);
myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete);

View File

@ -5,11 +5,16 @@ import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao;
import ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoSearchParameterR4;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.searchparam.extractor.ISearchParamExtractor;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.hl7.fhir.convertors.advisors.impl.BaseAdvisor_30_40;
import org.hl7.fhir.convertors.factory.VersionConvertorFactory_30_40;
import org.hl7.fhir.dstu3.model.PrimitiveType;
import org.hl7.fhir.dstu3.model.SearchParameter;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.stream.Collectors;
/*
* #%L
* HAPI FHIR JPA Server
@ -35,29 +40,30 @@ public class FhirResourceDaoSearchParameterDstu3 extends BaseHapiFhirResourceDao
@Autowired
private ISearchParamExtractor mySearchParamExtractor;
protected void markAffectedResources(SearchParameter theResource) {
protected void reindexAffectedResources(SearchParameter theResource, RequestDetails theRequestDetails) {
Boolean reindex = theResource != null ? CURRENTLY_REINDEXING.get(theResource) : null;
String expression = theResource != null ? theResource.getExpression() : null;
markResourcesMatchingExpressionAsNeedingReindexing(reindex, expression);
List<String> base = theResource != null ? theResource.getBase().stream().map(PrimitiveType::asStringValue).collect(Collectors.toList()) : null;
requestReindexForRelatedResources(reindex, base, theRequestDetails);
}
@Override
protected void postPersist(ResourceTable theEntity, SearchParameter theResource) {
super.postPersist(theEntity, theResource);
markAffectedResources(theResource);
protected void postPersist(ResourceTable theEntity, SearchParameter theResource, RequestDetails theRequestDetails) {
super.postPersist(theEntity, theResource, theRequestDetails);
reindexAffectedResources(theResource, theRequestDetails);
}
@Override
protected void postUpdate(ResourceTable theEntity, SearchParameter theResource) {
super.postUpdate(theEntity, theResource);
markAffectedResources(theResource);
protected void postUpdate(ResourceTable theEntity, SearchParameter theResource, RequestDetails theRequestDetails) {
super.postUpdate(theEntity, theResource, theRequestDetails);
reindexAffectedResources(theResource, theRequestDetails);
}
@Override
protected void preDelete(SearchParameter theResourceToDelete, ResourceTable theEntityToDelete) {
super.preDelete(theResourceToDelete, theEntityToDelete);
markAffectedResources(theResourceToDelete);
protected void preDelete(SearchParameter theResourceToDelete, ResourceTable theEntityToDelete, RequestDetails theRequestDetails) {
super.preDelete(theResourceToDelete, theEntityToDelete, theRequestDetails);
reindexAffectedResources(theResourceToDelete, theRequestDetails);
}
@Override

View File

@ -59,8 +59,8 @@ public class FhirResourceDaoSubscriptionDstu3 extends BaseHapiFhirResourceDao<Su
@Override
protected void postPersist(ResourceTable theEntity, Subscription theSubscription) {
super.postPersist(theEntity, theSubscription);
protected void postPersist(ResourceTable theEntity, Subscription theSubscription, RequestDetails theRequestDetails) {
super.postPersist(theEntity, theSubscription, theRequestDetails);
createSubscriptionTable(theEntity, theSubscription);
}

View File

@ -52,7 +52,6 @@ import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import static ca.uhn.fhir.jpa.dao.FhirResourceDaoValueSetDstu2.toStringOrNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -146,8 +145,8 @@ public class FhirResourceDaoCodeSystemR4 extends BaseHapiFhirResourceDao<CodeSys
}
@Override
protected void preDelete(CodeSystem theResourceToDelete, ResourceTable theEntityToDelete) {
super.preDelete(theResourceToDelete, theEntityToDelete);
protected void preDelete(CodeSystem theResourceToDelete, ResourceTable theEntityToDelete, RequestDetails theRequestDetails) {
super.preDelete(theResourceToDelete, theEntityToDelete, theRequestDetails);
myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete);

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoSearchParameter;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.searchparam.extractor.ISearchParamExtractor;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
@ -17,11 +18,14 @@ import ca.uhn.fhir.util.HapiExtensions;
import org.hl7.fhir.instance.model.api.IBase;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.CodeType;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.SearchParameter;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -51,29 +55,30 @@ public class FhirResourceDaoSearchParameterR4 extends BaseHapiFhirResourceDao<Se
@Autowired
private ISearchParamExtractor mySearchParamExtractor;
protected void markAffectedResources(SearchParameter theResource) {
protected void reindexAffectedResources(SearchParameter theResource, RequestDetails theRequestDetails) {
Boolean reindex = theResource != null ? CURRENTLY_REINDEXING.get(theResource) : null;
String expression = theResource != null ? theResource.getExpression() : null;
markResourcesMatchingExpressionAsNeedingReindexing(reindex, expression);
List<String> base = theResource != null ? theResource.getBase().stream().map(CodeType::getCode).collect(Collectors.toList()) : null;
requestReindexForRelatedResources(reindex, base, theRequestDetails);
}
@Override
protected void postPersist(ResourceTable theEntity, SearchParameter theResource) {
super.postPersist(theEntity, theResource);
markAffectedResources(theResource);
protected void postPersist(ResourceTable theEntity, SearchParameter theResource, RequestDetails theRequestDetails) {
super.postPersist(theEntity, theResource, theRequestDetails);
reindexAffectedResources(theResource, theRequestDetails);
}
@Override
protected void postUpdate(ResourceTable theEntity, SearchParameter theResource) {
super.postUpdate(theEntity, theResource);
markAffectedResources(theResource);
protected void postUpdate(ResourceTable theEntity, SearchParameter theResource, RequestDetails theRequestDetails) {
super.postUpdate(theEntity, theResource, theRequestDetails);
reindexAffectedResources(theResource, theRequestDetails);
}
@Override
protected void preDelete(SearchParameter theResourceToDelete, ResourceTable theEntityToDelete) {
super.preDelete(theResourceToDelete, theEntityToDelete);
markAffectedResources(theResourceToDelete);
protected void preDelete(SearchParameter theResourceToDelete, ResourceTable theEntityToDelete, RequestDetails theRequestDetails) {
super.preDelete(theResourceToDelete, theEntityToDelete, theRequestDetails);
reindexAffectedResources(theResourceToDelete, theRequestDetails);
}
@Override

View File

@ -59,8 +59,8 @@ public class FhirResourceDaoSubscriptionR4 extends BaseHapiFhirResourceDao<Subsc
@Override
protected void postPersist(ResourceTable theEntity, Subscription theSubscription) {
super.postPersist(theEntity, theSubscription);
protected void postPersist(ResourceTable theEntity, Subscription theSubscription, RequestDetails theRequestDetails) {
super.postPersist(theEntity, theSubscription, theRequestDetails);
createSubscriptionTable(theEntity, theSubscription);
}

View File

@ -53,7 +53,6 @@ import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import static ca.uhn.fhir.jpa.dao.FhirResourceDaoValueSetDstu2.toStringOrNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -147,8 +146,8 @@ public class FhirResourceDaoCodeSystemR5 extends BaseHapiFhirResourceDao<CodeSys
}
@Override
protected void preDelete(CodeSystem theResourceToDelete, ResourceTable theEntityToDelete) {
super.preDelete(theResourceToDelete, theEntityToDelete);
protected void preDelete(CodeSystem theResourceToDelete, ResourceTable theEntityToDelete, RequestDetails theRequestDetails) {
super.preDelete(theResourceToDelete, theEntityToDelete, theRequestDetails);
myTermDeferredStorageSvc.deleteCodeSystemForResource(theEntityToDelete);

View File

@ -5,11 +5,16 @@ import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao;
import ca.uhn.fhir.jpa.dao.r4.FhirResourceDaoSearchParameterR4;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.searchparam.extractor.ISearchParamExtractor;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.hl7.fhir.convertors.advisors.impl.BaseAdvisor_40_50;
import org.hl7.fhir.convertors.factory.VersionConvertorFactory_40_50;
import org.hl7.fhir.r5.model.CodeType;
import org.hl7.fhir.r5.model.SearchParameter;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.stream.Collectors;
/*
* #%L
* HAPI FHIR JPA Server
@ -35,29 +40,30 @@ public class FhirResourceDaoSearchParameterR5 extends BaseHapiFhirResourceDao<Se
@Autowired
private ISearchParamExtractor mySearchParamExtractor;
protected void markAffectedResources(SearchParameter theResource) {
protected void refactorAffectedResources(SearchParameter theResource, RequestDetails theRequestDetails) {
Boolean reindex = theResource != null ? CURRENTLY_REINDEXING.get(theResource) : null;
String expression = theResource != null ? theResource.getExpression() : null;
markResourcesMatchingExpressionAsNeedingReindexing(reindex, expression);
List<String> base = theResource != null ? theResource.getBase().stream().map(CodeType::getCode).collect(Collectors.toList()) : null;
requestReindexForRelatedResources(reindex, base, theRequestDetails);
}
@Override
protected void postPersist(ResourceTable theEntity, SearchParameter theResource) {
super.postPersist(theEntity, theResource);
markAffectedResources(theResource);
protected void postPersist(ResourceTable theEntity, SearchParameter theResource, RequestDetails theRequestDetails) {
super.postPersist(theEntity, theResource, theRequestDetails);
refactorAffectedResources(theResource, theRequestDetails);
}
@Override
protected void postUpdate(ResourceTable theEntity, SearchParameter theResource) {
super.postUpdate(theEntity, theResource);
markAffectedResources(theResource);
protected void postUpdate(ResourceTable theEntity, SearchParameter theResource, RequestDetails theRequestDetails) {
super.postUpdate(theEntity, theResource, theRequestDetails);
refactorAffectedResources(theResource, theRequestDetails);
}
@Override
protected void preDelete(SearchParameter theResourceToDelete, ResourceTable theEntityToDelete) {
super.preDelete(theResourceToDelete, theEntityToDelete);
markAffectedResources(theResourceToDelete);
protected void preDelete(SearchParameter theResourceToDelete, ResourceTable theEntityToDelete, RequestDetails theRequestDetails) {
super.preDelete(theResourceToDelete, theEntityToDelete, theRequestDetails);
refactorAffectedResources(theResourceToDelete, theRequestDetails);
}
@Override

View File

@ -59,8 +59,8 @@ public class FhirResourceDaoSubscriptionR5 extends BaseHapiFhirResourceDao<Subsc
@Override
protected void postPersist(ResourceTable theEntity, Subscription theSubscription) {
super.postPersist(theEntity, theSubscription);
protected void postPersist(ResourceTable theEntity, Subscription theSubscription, RequestDetails theRequestDetails) {
super.postPersist(theEntity, theSubscription, theRequestDetails);
createSubscriptionTable(theEntity, theSubscription);
}

View File

@ -20,6 +20,11 @@ package ca.uhn.fhir.jpa.search.reindex;
* #L%
*/
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
/**
* @deprecated Use the Batch2 {@link ca.uhn.fhir.batch2.api.IJobCoordinator#startInstance(JobInstanceStartRequest)} instead.
*/
public interface IResourceReindexingSvc {
/**

View File

@ -41,6 +41,9 @@ import org.springframework.stereotype.Service;
import static org.apache.commons.lang3.StringUtils.isBlank;
/**
* @deprecated
*/
@Service
public class ResourceReindexer {
private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexer.class);

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.search.reindex;
* #L%
*/
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
@ -79,7 +80,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
* @see ca.uhn.fhir.jpa.reindex.job.ReindexJobConfig
* @deprecated
* @deprecated Use the Batch2 {@link ca.uhn.fhir.batch2.api.IJobCoordinator#startInstance(JobInstanceStartRequest)} instead.
*/
@Deprecated
public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {

View File

@ -145,4 +145,8 @@ public class Batch2JobHelper {
}
}
}
public List<JobInstance> findJobsByDefinition(String theJobDefinitionId) {
return myJobCoordinator.getInstancesbyJobDefinitionIdAndEndedStatus(theJobDefinitionId, null, 100, 0);
}
}

View File

@ -105,6 +105,10 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
}
protected void createUniqueCompositeSp() {
addCreateDefaultPartition();
// we need two read partition accesses for when the creation of the SP triggers a reindex of Patient
addReadDefaultPartition(); // one to rewrite the resource url
addReadDefaultPartition(); // and one for the job request itself
SearchParameter sp = new SearchParameter();
sp.setId("SearchParameter/patient-birthdate");
sp.setType(Enumerations.SearchParamType.DATE);
@ -112,8 +116,12 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
sp.setExpression("Patient.birthDate");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
sp.addBase("Patient");
mySearchParameterDao.update(sp);
mySearchParameterDao.update(sp, mySrd);
addCreateDefaultPartition();
// we need two read partition accesses for when the creation of the SP triggers a reindex of Patient
addReadDefaultPartition(); // one to rewrite the resource url
addReadDefaultPartition(); // and one for the job request itself
sp = new SearchParameter();
sp.setId("SearchParameter/patient-birthdate-unique");
sp.setType(Enumerations.SearchParamType.COMPOSITE);
@ -125,7 +133,7 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
sp.addExtension()
.setUrl(HapiExtensions.EXT_SP_UNIQUE)
.setValue(new BooleanType(true));
mySearchParameterDao.update(sp);
mySearchParameterDao.update(sp, mySrd);
mySearchParamRegistry.forceRefresh();
}

View File

@ -449,7 +449,7 @@ public class FhirResourceDaoR4ComboUniqueParamTest extends BaseComboParamsR4Test
sp.setCode("patient");
sp.setName("patient");
sp.setType(Enumerations.SearchParamType.REFERENCE);
sp.addBase(ServiceRequest.class.getName());
sp.addBase(ServiceRequest.class.getSimpleName());
sp.setExpression("ServiceRequest.subject.where(resolve() is Patient)");
String patientParamId = mySearchParameterDao.create(sp).getId().toUnqualifiedVersionless().getValue();
@ -458,7 +458,7 @@ public class FhirResourceDaoR4ComboUniqueParamTest extends BaseComboParamsR4Test
sp.setCode("performer");
sp.setName("performer");
sp.setType(Enumerations.SearchParamType.REFERENCE);
sp.addBase(ServiceRequest.class.getName());
sp.addBase(ServiceRequest.class.getSimpleName());
sp.setExpression("ServiceRequest.performer");
String performerParamId = mySearchParameterDao.create(sp).getId().toUnqualifiedVersionless().getValue();
@ -467,7 +467,7 @@ public class FhirResourceDaoR4ComboUniqueParamTest extends BaseComboParamsR4Test
sp.setCode("identifier");
sp.setName("identifier");
sp.setType(Enumerations.SearchParamType.TOKEN);
sp.addBase(ServiceRequest.class.getName());
sp.addBase(ServiceRequest.class.getSimpleName());
sp.setExpression("ServiceRequest.identifier");
String identifierParamId = mySearchParameterDao.create(sp).getId().toUnqualifiedVersionless().getValue();

View File

@ -1,5 +1,7 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.context.phonetic.PhoneticEncoderEnum;
import ca.uhn.fhir.i18n.Msg;
@ -69,6 +71,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.countMatches;
import static org.hamcrest.MatcherAssert.assertThat;
@ -478,15 +481,13 @@ public class FhirResourceDaoR4SearchCustomSearchParamTest extends BaseJpaR4Test
fooSp.setXpathUsage(org.hl7.fhir.r4.model.SearchParameter.XPathUsageType.NORMAL);
fooSp.setStatus(org.hl7.fhir.r4.model.Enumerations.PublicationStatus.ACTIVE);
List<JobInstance> initialJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX);
mySearchParameterDao.create(fooSp, mySrd);
assertEquals(1, myResourceReindexingSvc.forceReindexingPass());
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
assertEquals(0, myResourceReindexingSvc.forceReindexingPass());
List<JobInstance> finalJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX);
List<JobInstance> newJobs = finalJobs.stream().filter(t -> !initialJobs.contains(t)).collect(Collectors.toList());
assertEquals(1, newJobs.size(), "number of jobs created");
}
@Test

View File

@ -954,7 +954,7 @@ public class FhirResourceDaoR4SearchOptimizedTest extends BaseJpaR4Test {
// See this PR for a similar type of Fix: https://github.com/hapifhir/hapi-fhir/pull/2909
// SearchParam - focalAccess
SearchParameter searchParameter1 = new SearchParameter();
searchParameter1.addBase("BodySite").addBase("Procedure");
searchParameter1.addBase("BodyStructure").addBase("Procedure");
searchParameter1.setCode("focalAccess");
searchParameter1.setType(Enumerations.SearchParamType.REFERENCE);
searchParameter1.setExpression("Procedure.extension('Procedure#focalAccess')");

View File

@ -104,6 +104,8 @@ public class FhirResourceDaoR4SearchSqlTest extends BaseJpaR4Test {
@Test
public void testSearchByProfile_InlineMode() {
myDaoConfig.setTagStorageMode(DaoConfig.TagStorageModeEnum.INLINE);
boolean reindexParamCache = myDaoConfig.isMarkResourcesForReindexingUponSearchParameterChange();
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(false);
SearchParameter searchParameter = FhirResourceDaoR4TagsTest.createSearchParamForInlineResourceProfile();
ourLog.info("SearchParam:\n{}", myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(searchParameter));
@ -132,6 +134,7 @@ public class FhirResourceDaoR4SearchSqlTest extends BaseJpaR4Test {
assertThat(toUnqualifiedVersionlessIds(outcome), Matchers.contains(id));
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(reindexParamCache);
}

View File

@ -48,6 +48,9 @@ public class PartitioningNonNullDefaultPartitionR4Test extends BasePartitioningR
@Test
public void testCreateAndSearch_NonPartitionable() {
addCreateDefaultPartition();
// we need two read partition accesses for when the creation of the SP triggers a reindex of Patient
addReadDefaultPartition(); // one to rewrite the resource url
addReadDefaultPartition(); // and one for the job request itself
SearchParameter sp = new SearchParameter();
sp.addBase("Patient");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
@ -77,6 +80,9 @@ public class PartitioningNonNullDefaultPartitionR4Test extends BasePartitioningR
@Test
public void testCreateAndSearch_NonPartitionable_ForcedId() {
addCreateDefaultPartition();
// we need two read partition accesses for when the creation of the SP triggers a reindex of Patient
addReadDefaultPartition(); // one to rewrite the resource url
addReadDefaultPartition(); // and one for the job request itself
SearchParameter sp = new SearchParameter();
sp.setId("SearchParameter/A");
sp.addBase("Patient");

View File

@ -112,6 +112,10 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
@Test
public void testCreateSearchParameter_DefaultPartition() {
addCreateDefaultPartition();
// we need two read partition accesses for when the creation of the SP triggers a reindex of Patient
addReadDefaultPartition(); // one to rewrite the resource url
addReadDefaultPartition(); // and one for the job request itself
SearchParameter sp = new SearchParameter();
sp.addBase("Patient");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
@ -119,7 +123,7 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
sp.setCode("extpatorg");
sp.setName("extpatorg");
sp.setExpression("Patient.extension('http://patext').value.as(Reference)");
Long id = mySearchParameterDao.create(sp).getId().getIdPartAsLong();
Long id = mySearchParameterDao.create(sp, mySrd).getId().getIdPartAsLong();
runInTransaction(() -> {
ResourceTable resourceTable = myResourceTableDao.findById(id).orElseThrow(IllegalArgumentException::new);
@ -286,6 +290,9 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
@Test
public void testCreateSearchParameter_DefaultPartitionWithDate() {
addCreateDefaultPartition(myPartitionDate);
// we need two read partition accesses for when the creation of the SP triggers a reindex of Patient
addReadDefaultPartition(); // one to rewrite the resource url
addReadDefaultPartition(); // and one for the job request itself
SearchParameter sp = new SearchParameter();
sp.addBase("Patient");

View File

@ -45,6 +45,10 @@ public class ReindexJobTest extends BaseJpaR4Test {
public void testReindex_ByUrl() {
// setup
// make sure the resources don't get auto-reindexed when the search parameter is created
boolean reindexPropertyCache = myDaoConfig.isMarkResourcesForReindexingUponSearchParameterChange();
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(false);
IIdType obsFinalId = myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);
myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.CANCELLED);
@ -72,6 +76,8 @@ public class ReindexJobTest extends BaseJpaR4Test {
List<String> alleleObservationIds = myReindexTestHelper.getAlleleObservationIds();
assertThat(alleleObservationIds, hasSize(1));
assertEquals(obsFinalId.getIdPart(), alleleObservationIds.get(0));
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(reindexPropertyCache);
}
@Test

View File

@ -1,8 +1,10 @@
package ca.uhn.fhir.jpa.provider.dstu3;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
@ -200,9 +202,10 @@ public class ResourceProviderCustomSearchParamDstu3Test extends BaseResourceProv
mySearchParameterDao.create(fooSp, mySrd);
runInTransaction(()->{
List<ResourceReindexJobEntity> allJobs = myResourceReindexJobDao.findAll();
List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX);
assertEquals(1, allJobs.size());
assertEquals("Patient", allJobs.get(0).getResourceType());
assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size());
assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl());
});
}

View File

@ -36,6 +36,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProviderR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(MultitenantBatchOperationR4Test.class);
private boolean myReindexParameterCache;
@BeforeEach
@Override
public void before() throws Exception {
@ -43,6 +45,9 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
myDaoConfig.setAllowMultipleDelete(true);
myDaoConfig.setExpungeEnabled(true);
myDaoConfig.setDeleteExpungeEnabled(true);
myReindexParameterCache = myDaoConfig.isMarkResourcesForReindexingUponSearchParameterChange();
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(false);
}
@BeforeEach
@ -58,6 +63,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
myDaoConfig.setExpungeEnabled(new DaoConfig().isExpungeEnabled());
myDaoConfig.setDeleteExpungeEnabled(new DaoConfig().isDeleteExpungeEnabled());
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(myReindexParameterCache);
super.after();
}

View File

@ -1,11 +1,13 @@
package ca.uhn.fhir.jpa.provider.r4;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
import ca.uhn.fhir.jpa.entity.Search;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamString;
@ -240,9 +242,10 @@ public class ResourceProviderCustomSearchParamR4Test extends BaseResourceProvide
mySearchParameterDao.create(fooSp, mySrd);
runInTransaction(() -> {
List<ResourceReindexJobEntity> allJobs = myResourceReindexJobDao.findAll();
List<JobInstance> allJobs = myBatch2JobHelper.findJobsByDefinition(ReindexAppCtx.JOB_REINDEX);
assertEquals(1, allJobs.size());
assertEquals("Patient", allJobs.get(0).getResourceType());
assertEquals(1, allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().size());
assertEquals("Patient?", allJobs.get(0).getParameters(ReindexJobParameters.class).getPartitionedUrls().get(0).getUrl());
});
}

View File

@ -245,7 +245,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
@Test
public void testParameterWithNoValueThrowsError_InvalidChainOnCustomSearch() throws IOException {
SearchParameter searchParameter = new SearchParameter();
searchParameter.addBase("BodySite").addBase("Procedure");
searchParameter.addBase("BodyStructure").addBase("Procedure");
searchParameter.setCode("focalAccess");
searchParameter.setType(Enumerations.SearchParamType.REFERENCE);
searchParameter.setExpression("Procedure.extension('Procedure#focalAccess')");
@ -267,7 +267,7 @@ public class ResourceProviderR4Test extends BaseResourceProviderR4Test {
@Test
public void testParameterWithNoValueThrowsError_InvalidRootParam() throws IOException {
SearchParameter searchParameter = new SearchParameter();
searchParameter.addBase("BodySite").addBase("Procedure");
searchParameter.addBase("BodyStructure").addBase("Procedure");
searchParameter.setCode("focalAccess");
searchParameter.setType(Enumerations.SearchParamType.REFERENCE);
searchParameter.setExpression("Procedure.extension('Procedure#focalAccess')");

View File

@ -140,6 +140,8 @@ public class ReindexStepTest extends BaseJpaR4Test {
// Setup
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
boolean markResourcesForReindexingUponSearchParameterChange = myDaoConfig.isMarkResourcesForReindexingUponSearchParameterChange();
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(false); // if this were true, it would set up a lot of reindex jobs extraneous to the one we're trying to test
IIdType orgId = createOrganization(withId("ORG"));
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON"), withOrganization(orgId)).getIdPartAsLong();
@ -196,6 +198,8 @@ public class ReindexStepTest extends BaseJpaR4Test {
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
myDaoConfig.setMarkResourcesForReindexingUponSearchParameterChange(markResourcesForReindexingUponSearchParameterChange);
}
@Test