Merge remote-tracking branch 'origin/master' into dmuylwyk-contained-search-param

This commit is contained in:
Diederik Muylwyk 2023-09-07 00:32:38 -04:00
commit 8c5976d55f
180 changed files with 3928 additions and 360 deletions

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -1001,9 +1001,9 @@ public class FhirContext {
*/
public void registerCustomType(final Class<? extends IBase> theType) {
Validate.notNull(theType, "theType must not be null");
ensureCustomTypeList();
myCustomTypes.add(theType);
myResourceNames = null;
}
/**
@ -1025,6 +1025,7 @@ public class FhirContext {
ensureCustomTypeList();
myCustomTypes.addAll(theTypes);
myResourceNames = null;
}
private BaseRuntimeElementDefinition<?> scanDatatype(final Class<? extends IElement> theResourceType) {

View File

@ -40,7 +40,7 @@ public class MethodOutcome {
private IBaseResource myResource;
private Map<String, List<String>> myResponseHeaders;
private Collection<Runnable> myResourceViewCallbacks;
private int myResponseStatusCode;
private Integer myResponseStatusCode;
/**
* Constructor
@ -258,6 +258,10 @@ public class MethodOutcome {
}
public int getResponseStatusCode() {
return myResponseStatusCode;
return isResponseStatusCodeSet() ? myResponseStatusCode : 0;
}
public boolean isResponseStatusCodeSet() {
return myResponseStatusCode != null;
}
}

View File

@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>
@ -12,7 +12,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -544,6 +544,7 @@ public class MethodUtil {
}
MethodOutcome retVal = new MethodOutcome();
retVal.setResponseStatusCode(theResponseStatusCode);
if (locationHeaders.size() > 0) {
String locationHeader = locationHeaders.get(0);
BaseOutcomeReturningMethodBinding.parseContentLocation(theContext, retVal, locationHeader);

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 5242
title: "Previously, `$mdm-clear` failed to expunge `REDIRECTED` golden resources which left them as orphans. This is now fixed."

View File

@ -0,0 +1,4 @@
---
type: add
issue: 5246
title: "Combined the ExpandResources step and WriteBinary step in the new WriteBinary step v2 for bulk exports."

View File

@ -0,0 +1,7 @@
---
type: fix
issue: 5254
title: "Previously, when bulk export is enabled the resource list would be generated and would
not be able to add a custom resource to that list. Now once a custom resource is added the list is
rebuilt.
"

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 5256
title: "Added RequestDetails as part of the parameters when cleaning up possible matches to enable interceptors to
access it."

View File

@ -0,0 +1,7 @@
---
type: change
issue: 5229
title: "Previously, when using INLINE tag storage mode, a superfluous version of a resource would be created as a result
of an update request which didn't have a real logical change to the resource but only changed the order of existing
items in tag, security label or profile collections. This change prevents this behaviour. Also on resource retrieval,
these meta collections are sorted alphabetically, based on (security, code) pair for tags and security labels."

View File

@ -0,0 +1,4 @@
---
type: fix
jira: SMILE-7307
title: "Previously, executing a Group Bulk Export without defining the `_type` parameter would accidentally omit `Patient` and `Organization` types. This has been corrected."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 5268
title: "Previously, the response status code set in a `MethodOutcome` of a Resource provider was not respected.
This has been fixed."

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 5276
title: "Previously, GraphQL queries will error when using base resource search parameters such as '_id' after search parameter rebuild.
This has been fixed."

View File

@ -0,0 +1,26 @@
This release introduces significant a change to the mechanism performing submission of resource modification events
to the message broker. Previously, an event would be submitted as part of the synchronous transaction
modifying a resource. Synchronous submission yielded responsive publishing with the caveat that events would be dropped
upon submission failure.
We have replaced the synchronous mechanism with a two stage process. Events are initially stored in
database upon completion of the transaction and subsequently submitted to the broker by a scheduled task.
This new asynchronous submission mechanism will introduce a slight delay in event publishing. It is our view that such
delay is largely compensated by the capability to retry submission upon failure which will eliminate event losses.
There are some potentially breaking changes:
* On resource retrieval and before storage, tags, security label and profile collections in resource meta will be
sorted in lexicographical order. The order of the elements for Coding types (i.e. tags and security labels) is defined
by the (security, code) pair of each element. This normally should not break any clients because these properties are
sets according to the FHIR specification, and hence the order of the elements in these collections should not matter.
Also with this change the following side effects can be observed:
- If using INLINE tag storage mode, the first update request to a resource which has tags, security
labels or profiles could create a superfluous resource version if the update request does not really introduce any
change to the resource. This is because the persisted tags, security labels, and profile may not be sorted in
lexicographical order, and this would be interpreted as a new resource version since the tags would be sorted
before storage after this change. If the update request actually changes the resource, there is no concern here.
Also, subsequent updates will not create an additional version because of ordering of the meta properties anymore.
- These meta collections are sorted in place by the storage layer before persisting the resource, so any piece of
code that is calling storage layer directly should not be passing in unmodifiable collections, as it would
result in an error.

View File

@ -1,4 +1,4 @@
# FQL Driver: SQL For FHIR Repositories
# HFQL Driver: SQL For FHIR Repositories
<div class="helpInfoCalloutBox">
This is an <a href="https://smilecdr.com/docs/introduction/maturity_model.html">experimental module</a>. Use with caution. This API is likely to change.

View File

@ -114,17 +114,18 @@ Here is a description of how each section of this document is configured.
These define one or more fields which must have a match before two resources are considered for matching.
This is like a list of "pre-searches" that find potential candidates for matches,
to avoid the expensive operation of running a match score calculation on all resources in the system.
`candidateSearchParameters` are capable of making exact searches and phonetic searches
(see the list of [phonetic search parameters](https://smilecdr.com/docs/fhir_repository/search_parameter_phonetic.html))
`candidateSearchParams` are capable of making searches using any SearchParameter defined in the system.
For example, [phonetic SearchParameters](https://smilecdr.com/docs/fhir_repository/search_parameter_phonetic.html)
can be useful here when matchFields include phonetic matchers.
E.g. you may only wish to consider matching two Patients if they either share at least one identifier in
common or have the same birthday or the same phone number. The HAPI FHIR server executes each of these searches
separately and then takes the union of the results, so you can think of these as `OR` criteria that
cast a wide net for potential candidates. In some MDM systems, these "pre-searches" are called "blocking"
searches (since they identify "blocks" of candidates that will be searched for matches).
If a list of searchParams is specified in a given candidateSearchParams item,
If a list of searchParams is specified in a given `candidateSearchParams` item,
then these search parameters are treated as `AND` parameters.
In the following candidateSearchParams definition, hapi-fhir will extract given name,
In the following `candidateSearchParams` definition, hapi-fhir will extract given name,
family name and identifiers from the incoming Patient and perform two separate searches,
first for all Patient resources that have the same given `AND` the same family name as
the incoming Patient, and second for all Patient resources that share at least one

View File

@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -57,6 +57,7 @@ import ca.uhn.fhir.jpa.dao.MatchResourceUrlService;
import ca.uhn.fhir.jpa.dao.ObservationLastNIndexPersistSvc;
import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.TransactionProcessor;
import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
import ca.uhn.fhir.jpa.dao.data.IResourceSearchUrlDao;
import ca.uhn.fhir.jpa.dao.expunge.ExpungeEverythingService;
import ca.uhn.fhir.jpa.dao.expunge.ExpungeOperation;
@ -155,6 +156,7 @@ import ca.uhn.fhir.jpa.searchparam.extractor.IResourceLinkResolver;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.sp.SearchParamPresenceSvcImpl;
import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessagePersistenceSvcImpl;
import ca.uhn.fhir.jpa.term.TermCodeSystemStorageSvcImpl;
import ca.uhn.fhir.jpa.term.TermConceptMappingSvcImpl;
import ca.uhn.fhir.jpa.term.TermReadSvcImpl;
@ -181,6 +183,9 @@ import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc;
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices;
import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import ca.uhn.fhir.util.IMetaTagSorter;
import ca.uhn.fhir.util.MetaTagSorterAlphabetical;
import ca.uhn.hapi.converters.canonical.VersionCanonicalizer;
import org.hl7.fhir.common.hapi.validation.support.UnknownCodeSystemWarningValidationSupport;
import org.hl7.fhir.utilities.graphql.IGraphQLStorageServices;
@ -891,4 +896,19 @@ public class JpaConfig {
public IMdmClearHelperSvc<JpaPid> helperSvc(IDeleteExpungeSvc<JpaPid> theDeleteExpungeSvc) {
return new MdmClearHelperSvcImpl(theDeleteExpungeSvc);
}
@Bean
public IResourceModifiedMessagePersistenceSvc subscriptionMessagePersistence(
FhirContext theFhirContext,
IResourceModifiedDao theIResourceModifiedDao,
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService) {
return new ResourceModifiedMessagePersistenceSvcImpl(
theFhirContext, theIResourceModifiedDao, theDaoRegistry, theHapiTransactionService);
}
@Bean
public IMetaTagSorter metaTagSorter() {
return new MetaTagSorterAlphabetical();
}
}

View File

@ -2087,6 +2087,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
break;
}
}
myMetaTagSorter.sort(retVal);
return retVal;
}

View File

@ -54,6 +54,7 @@ import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.parser.LenientErrorHandler;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.util.IMetaTagSorter;
import ca.uhn.fhir.util.MetaUtil;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IAnyResource;
@ -98,6 +99,9 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
@Autowired
private ExternallyStoredResourceServiceRegistry myExternallyStoredResourceServiceRegistry;
@Autowired
IMetaTagSorter myMetaTagSorter;
@Override
public IBaseResource toResource(IBasePersistedResource theEntity, boolean theForHistoryOperation) {
RuntimeResourceDefinition type = myFhirContext.getResourceDefinition(theEntity.getResourceType());
@ -229,6 +233,9 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
// 7. Add partition information
populateResourcePartitionInformation(theEntity, retVal);
// 8. sort tags, security labels and profiles
myMetaTagSorter.sort(retVal.getMeta());
return retVal;
}

View File

@ -0,0 +1,42 @@
package ca.uhn.fhir.jpa.dao.data;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK;
import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.List;
public interface IResourceModifiedDao
extends JpaRepository<ResourceModifiedEntity, PersistedResourceModifiedMessageEntityPK>,
IHapiFhirJpaRepository {
@Query("SELECT r FROM ResourceModifiedEntity r ORDER BY r.myCreatedTime ASC")
List<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime();
@Modifying
@Query("delete from ResourceModifiedEntity r where r.myResourceModifiedEntityPK =:pk")
int removeById(@Param("pk") PersistedResourceModifiedMessageEntityPK thePK);
}

View File

@ -437,6 +437,16 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.references(enversRevisionTable, revColumnName);
}
{
Builder.BuilderAddTableByColumns resourceModifiedTable =
version.addTableByColumns("20230315.1", "HFJ_RESOURCE_MODIFIED", "RES_ID", "RES_VER");
resourceModifiedTable.addColumn("RES_ID").nonNullable().type(ColumnTypeEnum.STRING, 256);
resourceModifiedTable.addColumn("RES_VER").nonNullable().type(ColumnTypeEnum.STRING, 8);
resourceModifiedTable.addColumn("CREATED_TIME").nonNullable().type(ColumnTypeEnum.DATE_TIMESTAMP);
resourceModifiedTable.addColumn("SUMMARY_MESSAGE").nonNullable().type(ColumnTypeEnum.STRING, 4000);
resourceModifiedTable.addColumn("RESOURCE_TYPE").nonNullable().type(ColumnTypeEnum.STRING, 40);
}
{
// The pre-release already contains the long version of this column
// We do this becausea doing a modifyColumn on Postgres (and possibly other RDBMS's) will fail with a nasty

View File

@ -0,0 +1,181 @@
package ca.uhn.fhir.jpa.subscription;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK;
import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity;
import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.List;
import static ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK.with;
/**
* This implementer provides the capability to persist subscription messages for asynchronous submission
* to the subscription processing pipeline with the purpose of offering a retry mechanism
* upon submission failure (see @link {@link AsyncResourceModifiedSubmitterSvc}).
*/
public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModifiedMessagePersistenceSvc {
private final FhirContext myFhirContext;
private final IResourceModifiedDao myResourceModifiedDao;
private final DaoRegistry myDaoRegistry;
private final ObjectMapper myObjectMapper;
private final HapiTransactionService myHapiTransactionService;
private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedMessagePersistenceSvcImpl.class);
public ResourceModifiedMessagePersistenceSvcImpl(
FhirContext theFhirContext,
IResourceModifiedDao theResourceModifiedDao,
DaoRegistry theDaoRegistry,
HapiTransactionService theHapiTransactionService) {
myFhirContext = theFhirContext;
myResourceModifiedDao = theResourceModifiedDao;
myDaoRegistry = theDaoRegistry;
myHapiTransactionService = theHapiTransactionService;
myObjectMapper = new ObjectMapper();
}
@Override
public List<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime() {
return myHapiTransactionService.withSystemRequest().execute(myResourceModifiedDao::findAllOrderedByCreatedTime);
}
@Override
public IPersistedResourceModifiedMessage persist(ResourceModifiedMessage theMsg) {
ResourceModifiedEntity resourceModifiedEntity = createEntityFrom(theMsg);
return myResourceModifiedDao.save(resourceModifiedEntity);
}
@Override
public ResourceModifiedMessage inflatePersistedResourceModifiedMessage(
IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
return inflateResourceModifiedMessageFromEntity((ResourceModifiedEntity) thePersistedResourceModifiedMessage);
}
@Override
public long getMessagePersistedCount() {
return myResourceModifiedDao.count();
}
@Override
public boolean deleteByPK(IPersistedResourceModifiedMessagePK theResourceModifiedPK) {
int removedCount =
myResourceModifiedDao.removeById((PersistedResourceModifiedMessageEntityPK) theResourceModifiedPK);
return removedCount == 1;
}
protected ResourceModifiedMessage inflateResourceModifiedMessageFromEntity(
ResourceModifiedEntity theResourceModifiedEntity) {
String resourcePid =
theResourceModifiedEntity.getResourceModifiedEntityPK().getResourcePid();
String resourceVersion =
theResourceModifiedEntity.getResourceModifiedEntityPK().getResourceVersion();
String resourceType = theResourceModifiedEntity.getResourceType();
ResourceModifiedMessage retVal =
getPayloadLessMessageFromString(theResourceModifiedEntity.getSummaryResourceModifiedMessage());
SystemRequestDetails systemRequestDetails =
new SystemRequestDetails().setRequestPartitionId(retVal.getPartitionId());
IdDt resourceIdDt = new IdDt(resourceType, resourcePid, resourceVersion);
IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType);
IBaseResource iBaseResource = dao.read(resourceIdDt, systemRequestDetails, true);
retVal.setNewPayload(myFhirContext, iBaseResource);
return retVal;
}
ResourceModifiedEntity createEntityFrom(ResourceModifiedMessage theMsg) {
IIdType theMsgId = theMsg.getPayloadId(myFhirContext);
ResourceModifiedEntity resourceModifiedEntity = new ResourceModifiedEntity();
resourceModifiedEntity.setResourceModifiedEntityPK(with(theMsgId.getIdPart(), theMsgId.getVersionIdPart()));
String partialModifiedMessage = getPayloadLessMessageAsString(theMsg);
resourceModifiedEntity.setSummaryResourceModifiedMessage(partialModifiedMessage);
resourceModifiedEntity.setResourceType(theMsgId.getResourceType());
resourceModifiedEntity.setCreatedTime(new Date());
return resourceModifiedEntity;
}
private ResourceModifiedMessage getPayloadLessMessageFromString(String thePayloadLessMessage) {
try {
return myObjectMapper.readValue(thePayloadLessMessage, ResourceModifiedMessage.class);
} catch (JsonProcessingException e) {
throw new ConfigurationException(Msg.code(2334) + "Failed to json deserialize payloadless message", e);
}
}
private String getPayloadLessMessageAsString(ResourceModifiedMessage theMsg) {
ResourceModifiedMessage tempMessage = new PayloadLessResourceModifiedMessage(theMsg);
try {
return myObjectMapper.writeValueAsString(tempMessage);
} catch (JsonProcessingException e) {
throw new ConfigurationException(Msg.code(2335) + "Failed to serialize empty ResourceModifiedMessage", e);
}
}
private static class PayloadLessResourceModifiedMessage extends ResourceModifiedMessage {
public PayloadLessResourceModifiedMessage(ResourceModifiedMessage theMsg) {
this.myPayloadId = theMsg.getPayloadId();
this.myPayloadVersion = theMsg.getPayloadVersion();
setSubscriptionId(theMsg.getSubscriptionId());
setMediaType(theMsg.getMediaType());
setOperationType(theMsg.getOperationType());
setPartitionId(theMsg.getPartitionId());
setTransactionId(theMsg.getTransactionId());
setMessageKey(theMsg.getMessageKeyOrNull());
copyAdditionalPropertiesFrom(theMsg);
}
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -3,7 +3,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -3,7 +3,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.ips.strategy.DefaultIpsGenerationStrategy;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.model.valueset.BundleEntrySearchModeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
@ -499,7 +500,7 @@ public class IpsGeneratorSvcImplTest {
IFhirResourceDao<Patient> patientDao = registerResourceDaoWithNoData(Patient.class);
Patient patient = new Patient();
patient.setId(PATIENT_ID);
when(patientDao.read(any(), any())).thenReturn(patient);
when(patientDao.read(any(), any(RequestDetails.class))).thenReturn(patient);
}
private void registerRemainingResourceDaos() {

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -36,7 +36,7 @@ import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.util.DateRangeUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
@ -82,8 +82,11 @@ public class GoldenResourceSearchSvcImpl implements IGoldenResourceSearchSvc {
DateRangeParam chunkDateRange =
DateRangeUtil.narrowDateRange(searchParamMap.getLastUpdated(), theStart, theEnd);
searchParamMap.setLastUpdated(chunkDateRange);
searchParamMap.add(
"_tag", new TokenParam(MdmConstants.SYSTEM_GOLDEN_RECORD_STATUS, MdmConstants.CODE_GOLDEN_RECORD));
TokenOrListParam goldenRecordStatusToken = new TokenOrListParam()
.add(MdmConstants.SYSTEM_GOLDEN_RECORD_STATUS, MdmConstants.CODE_GOLDEN_RECORD_REDIRECTED)
.add(MdmConstants.SYSTEM_GOLDEN_RECORD_STATUS, MdmConstants.CODE_GOLDEN_RECORD);
searchParamMap.add(Constants.PARAM_TAG, goldenRecordStatusToken);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType);
SystemRequestDetails request = new SystemRequestDetails();

View File

@ -74,7 +74,7 @@ class MdmSubscriptionLoaderTest {
Subscription subscription = new Subscription();
IdType id = new IdType("2401");
subscription.setIdElement(id);
when(mySubscriptionDao.read(eq(id), any())).thenThrow(new ResourceGoneException(""));
when(mySubscriptionDao.read(eq(id), any(RequestDetails.class))).thenThrow(new ResourceGoneException(""));
mySvc.updateIfNotPresent(subscription);
verify(mySubscriptionDao).update(eq(subscription), any(RequestDetails.class));
}
@ -84,7 +84,7 @@ class MdmSubscriptionLoaderTest {
Subscription subscription = new Subscription();
IdType id = new IdType("2401");
subscription.setIdElement(id);
when(mySubscriptionDao.read(eq(id), any())).thenThrow(new ResourceNotFoundException(""));
when(mySubscriptionDao.read(eq(id), any(RequestDetails.class))).thenThrow(new ResourceNotFoundException(""));
mySvc.updateIfNotPresent(subscription);
verify(mySubscriptionDao).update(eq(subscription), any(RequestDetails.class));
}
@ -94,7 +94,7 @@ class MdmSubscriptionLoaderTest {
Subscription subscription = new Subscription();
IdType id = new IdType("2401");
subscription.setIdElement(id);
when(mySubscriptionDao.read(eq(id), any())).thenReturn(subscription);
when(mySubscriptionDao.read(eq(id), any(RequestDetails.class))).thenReturn(subscription);
mySvc.updateIfNotPresent(subscription);
verify(mySubscriptionDao, never()).update(any(), any(RequestDetails.class));
}
@ -106,7 +106,7 @@ class MdmSubscriptionLoaderTest {
when(myMdmSettings.getMdmRules()).thenReturn(mdmRulesJson);
when(myChannelNamer.getChannelName(any(), any())).thenReturn("Test");
when(myDaoRegistry.getResourceDao(eq("Subscription"))).thenReturn(mySubscriptionDao);
when(mySubscriptionDao.read(any(), any())).thenThrow(new ResourceGoneException(""));
when(mySubscriptionDao.read(any(), any(RequestDetails.class))).thenThrow(new ResourceGoneException(""));
mySvc.daoUpdateMdmSubscriptions();

View File

@ -4,12 +4,13 @@ import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.mdm.helper.MdmHelperR4;
import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import ca.uhn.fhir.jpa.test.config.TestSubscriptionMatcherInterceptorConfig;
import org.hl7.fhir.dstu2.model.Subscription;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
@Import({SubscriptionSubmitterConfig.class, SubscriptionChannelConfig.class})
@Import({TestSubscriptionMatcherInterceptorConfig.class, SubscriptionSubmitterConfig.class, SubscriptionChannelConfig.class})
public class TestMdmConfigR4 extends BaseTestMdmConfig {
@Bean
MdmHelperR4 mdmHelperR4() {

View File

@ -8,6 +8,7 @@ import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearStep;
import ca.uhn.fhir.mdm.model.MdmTransactionContext;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
@ -17,7 +18,10 @@ import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.hapi.rest.server.helper.BatchHelperR4;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.DecimalType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
@ -175,11 +179,49 @@ public class MdmProviderClearLinkR4Test extends BaseLinkR4Test {
assertNoHistoricalLinksExist(List.of(myPractitionerGoldenResourceId.getValueAsString(), mySourcePatientId.getValueAsString()), new ArrayList<>());
}
@Test
public void testClearAllLinks_deletesRedirectedGoldenResources() {
createPatientAndUpdateLinks(buildJanePatient());
assertLinkCount(3);
List<IBaseResource> allGoldenPatients = getAllGoldenPatients();
assertThat(allGoldenPatients, hasSize(2));
IIdType redirectedGoldenPatientId = allGoldenPatients.get(0).getIdElement().toVersionless();
IIdType goldenPatientId = allGoldenPatients.get(1).getIdElement().toVersionless();
myMdmProvider.mergeGoldenResources(new StringType(redirectedGoldenPatientId.getValueAsString()),
new StringType(goldenPatientId.getValueAsString()),
null,
myRequestDetails);
Patient redirectedGoldenPatient = myPatientDao.read(redirectedGoldenPatientId, myRequestDetails);
List<Coding> patientTags = redirectedGoldenPatient.getMeta().getTag();
assertTrue(patientTags.stream()
.anyMatch(tag -> tag.getCode().equals(MdmConstants.CODE_GOLDEN_RECORD_REDIRECTED)));
assertLinkCount(4);
clearMdmLinks();
assertNoLinksExist();
try {
myPatientDao.read(redirectedGoldenPatientId, myRequestDetails);
fail();
} catch (ResourceNotFoundException e) {
assertEquals(Constants.STATUS_HTTP_404_NOT_FOUND, e.getStatusCode());
assertNoGoldenPatientsExist();
}
}
private void assertNoLinksExist() {
assertNoPatientLinksExist();
assertNoPractitionerLinksExist();
}
private void assertNoGoldenPatientsExist() {
assertThat(getAllGoldenPatients(), hasSize(0));
}
private void assertNoPatientLinksExist() {
assertThat(getPatientLinks(), hasSize(0));
}

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,27 @@
/*-
* #%L
* HAPI FHIR JPA Model
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.model.entity;
public interface IPersistedResourceModifiedMessage {
IPersistedResourceModifiedMessagePK getPersistedResourceModifiedMessagePk();
String getResourceType();
}

View File

@ -0,0 +1,27 @@
/*-
* #%L
* HAPI FHIR JPA Model
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.model.entity;
public interface IPersistedResourceModifiedMessagePK {
String getResourcePid();
String getResourceVersion();
}

View File

@ -0,0 +1,78 @@
package ca.uhn.fhir.jpa.model.entity;
/*-
* #%L
* HAPI FHIR JPA Model
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.io.Serializable;
import java.util.Objects;
import javax.persistence.Column;
import javax.persistence.Embeddable;
@Embeddable
public class PersistedResourceModifiedMessageEntityPK implements IPersistedResourceModifiedMessagePK, Serializable {
@Column(name = "RES_ID", length = 256, nullable = false)
private String myResourcePid;
@Column(name = "RES_VER", length = 8, nullable = false)
private String myResourceVersion;
public String getResourcePid() {
return myResourcePid;
}
public PersistedResourceModifiedMessageEntityPK setResourcePid(String theResourcePid) {
myResourcePid = theResourcePid;
return this;
}
public String getResourceVersion() {
return myResourceVersion;
}
public PersistedResourceModifiedMessageEntityPK setResourceVersion(String theResourceVersion) {
myResourceVersion = theResourceVersion;
return this;
}
public static PersistedResourceModifiedMessageEntityPK with(String theResourcePid, String theResourceVersion) {
return new PersistedResourceModifiedMessageEntityPK()
.setResourcePid(theResourcePid)
.setResourceVersion(theResourceVersion);
}
@Override
public boolean equals(Object theO) {
if (this == theO) return true;
if (theO == null || getClass() != theO.getClass()) return false;
PersistedResourceModifiedMessageEntityPK that = (PersistedResourceModifiedMessageEntityPK) theO;
return myResourcePid.equals(that.myResourcePid) && myResourceVersion.equals(that.myResourceVersion);
}
@Override
public int hashCode() {
return Objects.hash(myResourcePid, myResourceVersion);
}
@Override
public String toString() {
return myResourcePid + "/" + myResourceVersion;
}
}

View File

@ -0,0 +1,99 @@
package ca.uhn.fhir.jpa.model.entity;
/*-
* #%L
* HAPI FHIR JPA Model
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.io.Serializable;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
/**
* This class describes how a resourceModifiedMessage is stored for later processing in the event where
* submission to the subscription processing pipeline would fail. The persisted message does not include a
* payload (resource) as an in-memory version of the same message would. Instead, it points to a payload
* through the entity primary key {@link PersistedResourceModifiedMessageEntityPK} which is composed
* of the resource Pid and current version.
*/
@Entity
@Table(name = "HFJ_RESOURCE_MODIFIED")
public class ResourceModifiedEntity implements IPersistedResourceModifiedMessage, Serializable {
public static final int MESSAGE_LENGTH = 4000;
@EmbeddedId
private PersistedResourceModifiedMessageEntityPK myResourceModifiedEntityPK;
@Column(name = "SUMMARY_MESSAGE", length = MESSAGE_LENGTH, nullable = false)
private String mySummaryResourceModifiedMessage;
@Column(name = "CREATED_TIME", nullable = false)
@Temporal(TemporalType.TIMESTAMP)
private Date myCreatedTime;
@Column(name = "RESOURCE_TYPE", length = ResourceTable.RESTYPE_LEN, nullable = false)
private String myResourceType;
public PersistedResourceModifiedMessageEntityPK getResourceModifiedEntityPK() {
return myResourceModifiedEntityPK;
}
public ResourceModifiedEntity setResourceModifiedEntityPK(
PersistedResourceModifiedMessageEntityPK theResourceModifiedEntityPK) {
myResourceModifiedEntityPK = theResourceModifiedEntityPK;
return this;
}
@Override
public String getResourceType() {
return myResourceType;
}
public ResourceModifiedEntity setResourceType(String theResourceType) {
myResourceType = theResourceType;
return this;
}
public Date getCreatedTime() {
return myCreatedTime;
}
public void setCreatedTime(Date theCreatedTime) {
myCreatedTime = theCreatedTime;
}
public String getSummaryResourceModifiedMessage() {
return mySummaryResourceModifiedMessage;
}
public ResourceModifiedEntity setSummaryResourceModifiedMessage(String theSummaryResourceModifiedMessage) {
mySummaryResourceModifiedMessage = theSummaryResourceModifiedMessage;
return this;
}
@Override
public IPersistedResourceModifiedMessagePK getPersistedResourceModifiedMessagePk() {
return myResourceModifiedEntityPK;
}
}

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationSvc;
import ca.uhn.fhir.util.HapiExtensions;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.dstu2.model.Subscription;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
@ -781,6 +782,15 @@ public class StorageSettings {
return Collections.unmodifiableSet(mySupportedSubscriptionTypes);
}
/**
* Indicate whether a subscription channel type is supported by this server.
*
* @return true if at least one subscription channel type is supported by this server false otherwise.
*/
public boolean hasSupportedSubscriptionTypes() {
return CollectionUtils.isNotEmpty(mySupportedSubscriptionTypes);
}
@VisibleForTesting
public void clearSupportedSubscriptionTypesForUnitTest() {
mySupportedSubscriptionTypes.clear();

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,66 @@
package ca.uhn.fhir.jpa.subscription.async;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
/**
* This service is responsible for scheduling a job that will submit messages
* to the subscription processing pipeline at a given interval.
*/
public class AsyncResourceModifiedProcessingSchedulerSvc implements IHasScheduledJobs {
public static final long DEFAULT_SUBMISSION_INTERVAL_IN_MS = 5000;
public long mySubmissionIntervalInMilliSeconds;
public AsyncResourceModifiedProcessingSchedulerSvc() {
this(DEFAULT_SUBMISSION_INTERVAL_IN_MS);
}
public AsyncResourceModifiedProcessingSchedulerSvc(long theSubmissionIntervalInMilliSeconds) {
mySubmissionIntervalInMilliSeconds = theSubmissionIntervalInMilliSeconds;
}
@Override
public void scheduleJobs(ISchedulerService theSchedulerService) {
ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
jobDetail.setId(getClass().getName());
jobDetail.setJobClass(AsyncResourceModifiedProcessingSchedulerSvc.Job.class);
theSchedulerService.scheduleClusteredJob(mySubmissionIntervalInMilliSeconds, jobDetail);
}
public static class Job implements HapiJob {
@Autowired
private AsyncResourceModifiedSubmitterSvc myAsyncResourceModifiedSubmitterSvc;
@Override
public void execute(JobExecutionContext theContext) {
myAsyncResourceModifiedSubmitterSvc.runDeliveryPass();
}
}
}

View File

@ -0,0 +1,67 @@
package ca.uhn.fhir.jpa.subscription.async;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* The purpose of this service is to submit messages to the processing pipeline for which previous attempts at
* submission has failed. See also {@link AsyncResourceModifiedProcessingSchedulerSvc} and {@link IResourceModifiedMessagePersistenceSvc}.
*
*/
public class AsyncResourceModifiedSubmitterSvc {
private static final Logger ourLog = LoggerFactory.getLogger(AsyncResourceModifiedSubmitterSvc.class);
private final IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
private final IResourceModifiedConsumerWithRetries myResourceModifiedConsumer;
public AsyncResourceModifiedSubmitterSvc(
IResourceModifiedMessagePersistenceSvc theResourceModifiedMessagePersistenceSvc,
IResourceModifiedConsumerWithRetries theResourceModifiedConsumer) {
myResourceModifiedMessagePersistenceSvc = theResourceModifiedMessagePersistenceSvc;
myResourceModifiedConsumer = theResourceModifiedConsumer;
}
public void runDeliveryPass() {
List<IPersistedResourceModifiedMessage> allPersistedResourceModifiedMessages =
myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime();
ourLog.debug(
"Attempting to submit {} resources to consumer channel.", allPersistedResourceModifiedMessages.size());
for (IPersistedResourceModifiedMessage persistedResourceModifiedMessage :
allPersistedResourceModifiedMessages) {
boolean wasProcessed =
myResourceModifiedConsumer.submitPersisedResourceModifiedMessage(persistedResourceModifiedMessage);
if (!wasProcessed) {
break;
}
}
}
}

View File

@ -0,0 +1,50 @@
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.subscription.submit.config;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SynchronousSubscriptionMatcherInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import static ca.uhn.fhir.jpa.sched.BaseSchedulerServiceImpl.SCHEDULING_DISABLED;
@Configuration
public class SubscriptionMatcherInterceptorConfig {
@Autowired
private Environment myEnvironment;
@Bean
public SubscriptionMatcherInterceptor subscriptionMatcherInterceptor() {
if (isSchedulingDisabledForTests()) {
return new SynchronousSubscriptionMatcherInterceptor();
}
return new SubscriptionMatcherInterceptor();
}
private boolean isSchedulingDisabledForTests() {
String schedulingDisabled = myEnvironment.getProperty(SCHEDULING_DISABLED);
return "true".equals(schedulingDisabled);
}
}

View File

@ -20,14 +20,21 @@
package ca.uhn.fhir.jpa.subscription.submit.config;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedProcessingSchedulerSvc;
import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.model.config.SubscriptionModelConfig;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionQueryValidator;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionValidatingInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.subscription.triggering.SubscriptionTriggeringSvcImpl;
import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -38,14 +45,9 @@ import org.springframework.context.annotation.Lazy;
* matching queue for processing
*/
@Configuration
@Import(SubscriptionModelConfig.class)
@Import({SubscriptionModelConfig.class, SubscriptionMatcherInterceptorConfig.class})
public class SubscriptionSubmitterConfig {
@Bean
public SubscriptionMatcherInterceptor subscriptionMatcherInterceptor() {
return new SubscriptionMatcherInterceptor();
}
@Bean
public SubscriptionValidatingInterceptor subscriptionValidatingInterceptor() {
return new SubscriptionValidatingInterceptor();
@ -67,4 +69,31 @@ public class SubscriptionSubmitterConfig {
public ISubscriptionTriggeringSvc subscriptionTriggeringSvc() {
return new SubscriptionTriggeringSvcImpl();
}
@Bean
public ResourceModifiedSubmitterSvc resourceModifiedSvc(
IHapiTransactionService theHapiTransactionService,
IResourceModifiedMessagePersistenceSvc theResourceModifiedMessagePersistenceSvc,
SubscriptionChannelFactory theSubscriptionChannelFactory,
StorageSettings theStorageSettings) {
return new ResourceModifiedSubmitterSvc(
theStorageSettings,
theSubscriptionChannelFactory,
theResourceModifiedMessagePersistenceSvc,
theHapiTransactionService);
}
@Bean
public AsyncResourceModifiedProcessingSchedulerSvc asyncResourceModifiedProcessingSchedulerSvc() {
return new AsyncResourceModifiedProcessingSchedulerSvc();
}
@Bean
public AsyncResourceModifiedSubmitterSvc asyncResourceModifiedSubmitterSvc(
IResourceModifiedMessagePersistenceSvc theIResourceModifiedMessagePersistenceSvc,
IResourceModifiedConsumerWithRetries theResourceModifiedConsumer) {
return new AsyncResourceModifiedSubmitterSvc(
theIResourceModifiedMessagePersistenceSvc, theResourceModifiedConsumer);
}
}

View File

@ -28,31 +28,26 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static java.util.Objects.isNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
/**
*
* This interceptor is responsible for submitting operations on resources to the subscription pipeline.
*
*/
@Interceptor
public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer {
public class SubscriptionMatcherInterceptor {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatcherInterceptor.class);
@Autowired
@ -61,16 +56,14 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
@Autowired
private IInterceptorBroadcaster myInterceptorBroadcaster;
@Autowired
private SubscriptionChannelFactory mySubscriptionChannelFactory;
@Autowired
private StorageSettings myStorageSettings;
@Autowired
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
private volatile MessageChannel myMatchingChannel;
@Autowired
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
/**
* Constructor
@ -79,122 +72,91 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
super();
}
@EventListener(classes = {ContextRefreshedEvent.class})
public void startIfNeeded() {
if (myStorageSettings.getSupportedSubscriptionTypes().isEmpty()) {
ourLog.debug(
"Subscriptions are disabled on this server. Skipping {} channel creation.",
SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME);
return;
}
if (myMatchingChannel == null) {
myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(
SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME, getChannelProducerSettings());
}
}
@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_CREATED)
public void resourceCreated(IBaseResource theResource, RequestDetails theRequest) {
startIfNeeded();
submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest);
processResourceModifiedEvent(theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, theRequest);
}
@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_DELETED)
public void resourceDeleted(IBaseResource theResource, RequestDetails theRequest) {
startIfNeeded();
submitResourceModified(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest);
processResourceModifiedEvent(theResource, ResourceModifiedMessage.OperationTypeEnum.DELETE, theRequest);
}
@Hook(Pointcut.STORAGE_PRECOMMIT_RESOURCE_UPDATED)
public void resourceUpdated(IBaseResource theOldResource, IBaseResource theNewResource, RequestDetails theRequest) {
startIfNeeded();
if (!myStorageSettings.isTriggerSubscriptionsForNonVersioningChanges()) {
if (theOldResource != null && theNewResource != null) {
String oldVersion = theOldResource.getIdElement().getVersionIdPart();
String newVersion = theNewResource.getIdElement().getVersionIdPart();
if (isNotBlank(oldVersion) && isNotBlank(newVersion) && oldVersion.equals(newVersion)) {
boolean dontTriggerSubscriptionWhenVersionsAreTheSame =
!myStorageSettings.isTriggerSubscriptionsForNonVersioningChanges();
boolean resourceVersionsAreTheSame = isSameResourceVersion(theOldResource, theNewResource);
if (dontTriggerSubscriptionWhenVersionsAreTheSame && resourceVersionsAreTheSame) {
return;
}
}
}
submitResourceModified(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
processResourceModifiedEvent(theNewResource, ResourceModifiedMessage.OperationTypeEnum.UPDATE, theRequest);
}
/**
* This is an internal API - Use with caution!
*
* This method will create a {@link ResourceModifiedMessage}, persist it and arrange for its delivery to the
* subscription pipeline after the resource was committed. The message is persisted to provide asynchronous submission
* in the event where submission would fail.
*/
@Override
public void submitResourceModified(
protected void processResourceModifiedEvent(
IBaseResource theNewResource,
ResourceModifiedMessage.OperationTypeEnum theOperationType,
RequestDetails theRequest) {
// Even though the resource is being written, the subscription will be interacting with it by effectively
// "reading" it so we set the RequestPartitionId as a read request
RequestPartitionId requestPartitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead(
theRequest, theNewResource.getIdElement().getResourceType(), theNewResource.getIdElement());
ResourceModifiedMessage msg = new ResourceModifiedMessage(
myFhirContext, theNewResource, theOperationType, theRequest, requestPartitionId);
ResourceModifiedMessage msg = createResourceModifiedMessage(theNewResource, theOperationType, theRequest);
// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
HookParams params = new HookParams().add(ResourceModifiedMessage.class, msg);
boolean outcome = CompositeInterceptorBroadcaster.doCallHooks(
myInterceptorBroadcaster, theRequest, Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, params);
if (!outcome) {
return;
}
submitResourceModified(msg);
processResourceModifiedMessage(msg);
}
/**
* This is an internal API - Use with caution!
*/
@Override
public void submitResourceModified(final ResourceModifiedMessage theMsg) {
/*
* We only want to submit the message to the processing queue once the
* transaction is committed. We do this in order to make sure that the
* data is actually in the DB, in case it's the database matcher.
*/
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public int getOrder() {
return 0;
protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) {
// persist the message for async submission to the processing pipeline. see {@link
// AsyncResourceModifiedProcessingSchedulerSvc}
myResourceModifiedMessagePersistenceSvc.persist(theResourceModifiedMessage);
}
@Override
public void afterCommit() {
sendToProcessingChannel(theMsg);
}
});
} else {
sendToProcessingChannel(theMsg);
}
protected ResourceModifiedMessage createResourceModifiedMessage(
IBaseResource theNewResource,
BaseResourceMessage.OperationTypeEnum theOperationType,
RequestDetails theRequest) {
// Even though the resource is being written, the subscription will be interacting with it by effectively
// "reading" it so we set the RequestPartitionId as a read request
RequestPartitionId requestPartitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead(
theRequest, theNewResource.getIdElement().getResourceType(), theNewResource.getIdElement());
return new ResourceModifiedMessage(
myFhirContext, theNewResource, theOperationType, theRequest, requestPartitionId);
}
protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
ourLog.trace("Sending resource modified message to processing channel");
Validate.notNull(
myMatchingChannel,
"A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
myMatchingChannel.send(new ResourceModifiedJsonMessage(theMessage));
private boolean isSameResourceVersion(IBaseResource theOldResource, IBaseResource theNewResource) {
if (isNull(theOldResource) || isNull(theNewResource)) {
return false;
}
private ChannelProducerSettings getChannelProducerSettings() {
ChannelProducerSettings channelProducerSettings = new ChannelProducerSettings();
channelProducerSettings.setQualifyChannelName(myStorageSettings.isQualifySubscriptionMatchingChannelName());
return channelProducerSettings;
String oldVersion = theOldResource.getIdElement().getVersionIdPart();
String newVersion = theNewResource.getIdElement().getVersionIdPart();
if (isBlank(oldVersion) || isBlank(newVersion)) {
return false;
}
return oldVersion.equals(newVersion);
}
public void setFhirContext(FhirContext theCtx) {
myFhirContext = theCtx;
}
@VisibleForTesting
public LinkedBlockingChannel getProcessingChannelForUnitTest() {
startIfNeeded();
return (LinkedBlockingChannel) myMatchingChannel;
}
}

View File

@ -0,0 +1,59 @@
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedProcessingSchedulerSvc;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* The purpose of this interceptor is to synchronously submit ResourceModifiedMessage to the
* subscription processing pipeline, ie, as part of processing the operation on a resource.
* It is meant to replace the SubscriptionMatcherInterceptor in integrated tests where
* scheduling is disabled. See {@link AsyncResourceModifiedProcessingSchedulerSvc}
* for further details on asynchronous submissions.
*/
public class SynchronousSubscriptionMatcherInterceptor extends SubscriptionMatcherInterceptor {
@Autowired
private IResourceModifiedConsumer myResourceModifiedConsumer;
@Override
protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public int getOrder() {
return 0;
}
@Override
public void afterCommit() {
myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
}
});
} else {
myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
}
}
}

View File

@ -0,0 +1,230 @@
package ca.uhn.fhir.jpa.subscription.submit.svc;
/*-
* #%L
* HAPI FHIR Subscription Server
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.r5.model.IdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.TransactionCallback;
import java.util.Optional;
import static ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionMatchingSubscriber.SUBSCRIPTION_MATCHING_CHANNEL_NAME;
/**
* This service provides two distinct contexts in which it submits messages to the subscription pipeline.
*
* It implements {@link IResourceModifiedConsumer} for synchronous submissions where retry upon failures is not required.
*
* It implements {@link IResourceModifiedConsumerWithRetries} for synchronous submissions performed as part of processing
* an operation on a resource (see {@link SubscriptionMatcherInterceptor}). Submissions in such context require retries
* upon submission failure.
*
*
*/
public class ResourceModifiedSubmitterSvc implements IResourceModifiedConsumer, IResourceModifiedConsumerWithRetries {
private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedSubmitterSvc.class);
private volatile MessageChannel myMatchingChannel;
private final StorageSettings myStorageSettings;
private final SubscriptionChannelFactory mySubscriptionChannelFactory;
private final IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
private final IHapiTransactionService myHapiTransactionService;
@EventListener(classes = {ContextRefreshedEvent.class})
public void startIfNeeded() {
if (!myStorageSettings.hasSupportedSubscriptionTypes()) {
ourLog.debug(
"Subscriptions are disabled on this server. Skipping {} channel creation.",
SUBSCRIPTION_MATCHING_CHANNEL_NAME);
return;
}
if (myMatchingChannel == null) {
myMatchingChannel = mySubscriptionChannelFactory.newMatchingSendingChannel(
SUBSCRIPTION_MATCHING_CHANNEL_NAME, getChannelProducerSettings());
}
}
public ResourceModifiedSubmitterSvc(
StorageSettings theStorageSettings,
SubscriptionChannelFactory theSubscriptionChannelFactory,
IResourceModifiedMessagePersistenceSvc resourceModifiedMessagePersistenceSvc,
IHapiTransactionService theHapiTransactionService) {
myStorageSettings = theStorageSettings;
mySubscriptionChannelFactory = theSubscriptionChannelFactory;
myResourceModifiedMessagePersistenceSvc = resourceModifiedMessagePersistenceSvc;
myHapiTransactionService = theHapiTransactionService;
}
/**
* @inheritDoc
* Submit a message to the broker without retries.
*
* Implementation of the {@link IResourceModifiedConsumer}
*
*/
@Override
public void submitResourceModified(ResourceModifiedMessage theMsg) {
startIfNeeded();
ourLog.trace("Sending resource modified message to processing channel");
Validate.notNull(
myMatchingChannel,
"A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
myMatchingChannel.send(new ResourceModifiedJsonMessage(theMsg));
}
/**
* This method will inflate the ResourceModifiedMessage represented by the IPersistedResourceModifiedMessage and attempts
* to submit it to the subscription processing pipeline.
*
* If submission succeeds, the IPersistedResourceModifiedMessage is deleted and true is returned. In the event where submission
* fails, we return false and the IPersistedResourceModifiedMessage is rollback for later re-submission.
*
* @param thePersistedResourceModifiedMessage A ResourceModifiedMessage in it's IPersistedResourceModifiedMessage that requires submission.
* @return Whether the message was successfully submitted to the broker.
*/
@Override
public boolean submitPersisedResourceModifiedMessage(
IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
return myHapiTransactionService
.withSystemRequest()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(doProcessResourceModifiedInTransaction(thePersistedResourceModifiedMessage));
}
/**
* This method is the cornerstone in the submit and retry upon failure mechanism for messages needing submission to the subscription processing pipeline.
* It requires execution in a transaction for rollback of deleting the persistedResourceModifiedMessage pointed to by <code>thePersistedResourceModifiedMessage<code/>
* in the event where submission would fail.
*
* @param thePersistedResourceModifiedMessage the primary key pointing to the persisted version (IPersistedResourceModifiedMessage) of a ResourceModifiedMessage needing submission
* @return true upon successful submission, false otherwise.
*/
protected TransactionCallback<Boolean> doProcessResourceModifiedInTransaction(
IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
return theStatus -> {
boolean processed = true;
ResourceModifiedMessage resourceModifiedMessage = null;
try {
// delete the entry to lock the row to ensure unique processing
boolean wasDeleted = deletePersistedResourceModifiedMessage(
thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk());
Optional<ResourceModifiedMessage> optionalResourceModifiedMessage =
inflatePersistedResourceMessage(thePersistedResourceModifiedMessage);
if (wasDeleted && optionalResourceModifiedMessage.isPresent()) {
// the PK did exist and we were able to deleted it, ie, we are the only one processing the message
resourceModifiedMessage = optionalResourceModifiedMessage.get();
submitResourceModified(resourceModifiedMessage);
}
} catch (MessageDeliveryException exception) {
// we encountered an issue when trying to send the message so mark the transaction for rollback
ourLog.error(
"Channel submission failed for resource with id {} matching subscription with id {}. Further attempts will be performed at later time.",
resourceModifiedMessage.getPayloadId(),
resourceModifiedMessage.getSubscriptionId());
processed = false;
theStatus.setRollbackOnly();
}
return processed;
};
}
private Optional<ResourceModifiedMessage> inflatePersistedResourceMessage(
IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
ResourceModifiedMessage resourceModifiedMessage = null;
try {
resourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(
thePersistedResourceModifiedMessage);
} catch (ResourceNotFoundException e) {
IPersistedResourceModifiedMessagePK persistedResourceModifiedMessagePk =
thePersistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk();
IdType idType = new IdType(
thePersistedResourceModifiedMessage.getResourceType(),
persistedResourceModifiedMessagePk.getResourcePid(),
persistedResourceModifiedMessagePk.getResourceVersion());
ourLog.warn(
"Scheduled submission will be ignored since resource {} cannot be found", idType.asStringValue());
}
return Optional.ofNullable(resourceModifiedMessage);
}
private boolean deletePersistedResourceModifiedMessage(IPersistedResourceModifiedMessagePK theResourceModifiedPK) {
try {
// delete the entry to lock the row to ensure unique processing
return myResourceModifiedMessagePersistenceSvc.deleteByPK(theResourceModifiedPK);
} catch (ResourceNotFoundException exception) {
ourLog.warn(
"thePersistedResourceModifiedMessage with {} and version {} could not be deleted as it may have already been deleted.",
theResourceModifiedPK.getResourcePid(),
theResourceModifiedPK.getResourceVersion());
// we were not able to delete the pk. this implies that someone else did read/delete the PK and processed
// the message
// successfully before we did.
return false;
}
}
private ChannelProducerSettings getChannelProducerSettings() {
ChannelProducerSettings channelProducerSettings = new ChannelProducerSettings();
channelProducerSettings.setQualifyChannelName(myStorageSettings.isQualifySubscriptionMatchingChannelName());
return channelProducerSettings;
}
public IChannelProducer getProcessingChannelForUnitTest() {
startIfNeeded();
return (IChannelProducer) myMatchingChannel;
}
}

View File

@ -39,7 +39,7 @@ import java.util.function.Function;
* This interceptor can be used for troubleshooting subscription processing. It provides very
* detailed logging about the subscription processing pipeline.
* <p>
* This interceptor loges each step in the processing pipeline with a
* This interceptor logs each step in the processing pipeline with a
* different event code, using the event codes itemized in
* {@link EventCodeEnum}. By default these are each placed in a logger with
* a different name (e.g. <code>ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor.SUBS20</code>
@ -91,7 +91,7 @@ public class SubscriptionDebugLogInterceptor {
}
log(
EventCodeEnum.SUBS1,
"Resource {} was submitted to the processing pipeline (op={})",
"Resource {} is starting the processing pipeline (op={})",
resourceId,
theMessage.getOperationType());
}

View File

@ -1,60 +0,0 @@
package ca.uhn.fhir.jpa.subscription.submit.interceptor;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hl7.fhir.dstu2.model.Subscription.SubscriptionChannelType.RESTHOOK;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SubscriptionMatcherInterceptorTest {
@Mock
StorageSettings myStorageSettings;
@Mock
SubscriptionChannelFactory mySubscriptionChannelFactory;
@InjectMocks
SubscriptionMatcherInterceptor myUnitUnderTest;
@Captor
ArgumentCaptor<ChannelProducerSettings> myArgumentCaptor;
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testMethodStartIfNeeded_withQualifySubscriptionMatchingChannelNameProperty_mayQualifyChannelName(boolean theIsQualifySubMatchingChannelName){
// given
boolean expectedResult = theIsQualifySubMatchingChannelName;
when(myStorageSettings.isQualifySubscriptionMatchingChannelName()).thenReturn(theIsQualifySubMatchingChannelName);
when(myStorageSettings.getSupportedSubscriptionTypes()).thenReturn(Set.of(RESTHOOK));
// when
myUnitUnderTest.startIfNeeded();
// then
ChannelProducerSettings capturedChannelProducerSettings = getCapturedChannelProducerSettings();
assertThat(capturedChannelProducerSettings.isQualifyChannelName(), is(expectedResult));
}
private ChannelProducerSettings getCapturedChannelProducerSettings(){
verify(mySubscriptionChannelFactory).newMatchingSendingChannel(anyString(), myArgumentCaptor.capture());
return myArgumentCaptor.getValue();
}
}

View File

@ -6,12 +6,14 @@ import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import org.hl7.fhir.dstu2.model.Subscription;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -21,6 +23,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.PlatformTransactionManager;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
@ -34,24 +37,12 @@ import static org.mockito.Mockito.verify;
})
public class SubscriptionSubmitInterceptorLoaderTest {
@MockBean
private ISearchParamProvider mySearchParamProvider;
@MockBean
private IInterceptorService myInterceptorService;
@MockBean
private IValidationSupport myValidationSupport;
@MockBean
private SubscriptionChannelFactory mySubscriptionChannelFactory;
@MockBean
private DaoRegistry myDaoRegistry;
@Autowired
private SubscriptionSubmitInterceptorLoader mySubscriptionSubmitInterceptorLoader;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@MockBean
private IResourceVersionSvc myResourceVersionSvc;
@MockBean
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
private IInterceptorService myInterceptorService;
/**
* It should be possible to run only the {@link SubscriptionSubmitterConfig} without the
@ -82,6 +73,25 @@ public class SubscriptionSubmitInterceptorLoaderTest {
return storageSettings;
}
@MockBean
private ISearchParamProvider mySearchParamProvider;
@MockBean
private IValidationSupport myValidationSupport;
@MockBean
private SubscriptionChannelFactory mySubscriptionChannelFactory;
@MockBean
private DaoRegistry myDaoRegistry;
@MockBean
private IResourceVersionSvc myResourceVersionSvc;
@MockBean
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@MockBean
private PlatformTransactionManager myPlatformTransactionManager;
@MockBean
private IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
@MockBean
private IHapiTransactionService myHapiTransactionService;
}

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.SearchParamMatcher;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.hl7.fhir.r5.model.Encounter;
import org.hl7.fhir.r5.model.IdType;
import org.hl7.fhir.r5.model.SubscriptionTopic;

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.dao.SearchBuilderFactory;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.search.builder.SearchBuilder;
import ca.uhn.fhir.jpa.svc.MockHapiTransactionService;
import ca.uhn.fhir.jpa.util.BaseIterator;
import ca.uhn.fhir.model.dstu2.resource.Patient;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;

View File

@ -210,7 +210,6 @@ public class RestHookTestWithInterceptorRegisteredToStorageSettingsDstu2Test ext
Subscription subscription1 = createSubscription(criteria1, payload, ourListenerServerBase);
Subscription subscription2 = createSubscription(criteria2, payload, ourListenerServerBase);
runInTransaction(() -> {
ourLog.info("All token indexes:\n * {}", myResourceIndexedSearchParamTokenDao.findAll().stream().map(t -> t.toString()).collect(Collectors.joining("\n * ")));
});

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -42,6 +42,7 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.ClasspathUtil;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
@ -447,7 +448,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
String name = "profiles-resources";
ourLog.info("Uploading " + name);
String vsContents;
vsContents = IOUtils.toString(FhirResourceDaoDstu3Test.class.getResourceAsStream("/org/hl7/fhir/dstu3/model/profile/" + name + ".xml"), StandardCharsets.UTF_8);
vsContents = ClasspathUtil.loadResource("/org/hl7/fhir/dstu3/model/profile/" + name + ".xml");
bundle = myFhirContext.newXmlParser().parseResource(org.hl7.fhir.dstu3.model.Bundle.class, vsContents);
for (BundleEntryComponent i : bundle.getEntry()) {

View File

@ -242,7 +242,7 @@ public class FhirSystemDaoDstu3Test extends BaseJpaDstu3SystemTest {
}
private Bundle loadBundle(String theFileName) throws IOException {
String req = IOUtils.toString(FhirSystemDaoDstu3Test.class.getResourceAsStream(theFileName), StandardCharsets.UTF_8);
String req = ClasspathUtil.loadResource(theFileName);
return myFhirContext.newXmlParser().parseResource(Bundle.class, req);
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -604,6 +604,47 @@ public class BulkExportUseCaseTest extends BaseResourceProviderR4Test {
@Nested
public class GroupBulkExportTests {
@Test
public void testGroupExportSuccessfulyExportsPatientForwardReferences() {
BundleBuilder bb = new BundleBuilder(myFhirContext);
Group group = new Group();
group.setId("Group/G");
group.setActive(true);
bb.addTransactionUpdateEntry(group);
Practitioner pract = new Practitioner();
pract.setId("PRACT-IN-GROUP");
bb.addTransactionUpdateEntry(pract);
Organization organization = new Organization();
organization.setId("ORG-IN-GROUP");
bb.addTransactionUpdateEntry(organization);
Patient patient = new Patient();
patient.setId("PAT-IN-GROUP");
patient.setGender(Enumerations.AdministrativeGender.FEMALE);
patient.setActive(true);
patient.setManagingOrganization(new Reference("Organization/ORG-IN-GROUP"));
patient.setGeneralPractitioner(List.of(new Reference("Practitioner/PRACT-IN-GROUP")));
bb.addTransactionUpdateEntry(patient);
group.addMember().getEntity().setReference("Patient/PAT-IN-GROUP");
myClient.transaction().withBundle(bb.getBundle()).execute();
HashSet<String> resourceTypes = Sets.newHashSet();
BulkExportJobResults bulkExportJobResults = startGroupBulkExportJobAndAwaitCompletion(resourceTypes, new HashSet<>(), "G");
Map<String, List<IBaseResource>> firstMap = convertJobResultsToResources(bulkExportJobResults);
assertThat(firstMap.keySet(), hasSize(4));
assertThat(firstMap.get("Group"), hasSize(1));
assertThat(firstMap.get("Patient"), hasSize(1));
assertThat(firstMap.get("Practitioner"), hasSize(1));
assertThat(firstMap.get("Organization"), hasSize(1));
}
@Test
public void testVeryLargeGroup() {

View File

@ -16,8 +16,8 @@ import ca.uhn.fhir.jpa.model.dao.JpaPid;
import ca.uhn.fhir.jpa.model.entity.ForcedId;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.search.MockHapiTransactionService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.svc.MockHapiTransactionService;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;

View File

@ -25,7 +25,7 @@ import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.search.PersistedJpaSearchFirstPageBundleProvider;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.jpa.term.TermReadSvcImpl;
import ca.uhn.fhir.jpa.util.SqlQuery;
@ -126,6 +126,7 @@ import static org.mockito.Mockito.when;
@SuppressWarnings("JavadocBlankLines")
@TestMethodOrder(MethodOrderer.MethodName.class)
public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test {
@RegisterExtension
@Order(0)
public static final RestfulServerExtension ourServer = new RestfulServerExtension(FhirContext.forR4Cached())
@ -139,7 +140,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
@Autowired
private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
private ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;;
@Autowired
private ReindexStep myReindexStep;
@Autowired
@ -3090,7 +3091,7 @@ public class FhirResourceDaoR4QueryCountTest extends BaseResourceProviderR4Test
// Setup
myStorageSettings.addSupportedSubscriptionType(org.hl7.fhir.dstu2.model.Subscription.SubscriptionChannelType.RESTHOOK);
mySubscriptionMatcherInterceptor.startIfNeeded();
myResourceModifiedSubmitterSvc.startIfNeeded();
for (int i = 0; i < 10; i++) {
createPatient(withActiveTrue());

View File

@ -0,0 +1,100 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.storage.test.TagTestCasesUtil;
import org.hl7.fhir.r4.model.Meta;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.List;
import static ca.uhn.fhir.test.utilities.TagTestUtil.createMeta;
import static ca.uhn.fhir.test.utilities.TagTestUtil.generateAllCodingPairs;
public class FhirResourceDaoR4TagsOrderTest extends BaseJpaR4Test {
private TagTestCasesUtil myTagTestCasesUtil;
@Override
@BeforeEach
protected void before() throws Exception {
super.before();
myTagTestCasesUtil = new TagTestCasesUtil(myPatientDao, mySystemDao, mySrd, true);
}
@ParameterizedTest
@EnumSource(JpaStorageSettings.TagStorageModeEnum.class)
public void testCreateResource_ExpectToRetrieveTagsSorted(JpaStorageSettings.TagStorageModeEnum theTagStorageMode) {
myStorageSettings.setTagStorageMode(theTagStorageMode);
// TODO: In inline mode, $meta endpoint doesn't return tags, see https://github.com/hapifhir/hapi-fhir/issues/5206
// When this issue is fixed, the following line could be removed so that we check $meta for Inline mode as well
myTagTestCasesUtil.setMetaOperationSupported(theTagStorageMode != JpaStorageSettings.TagStorageModeEnum.INLINE);
myTagTestCasesUtil.createResourceWithTagsAndExpectToRetrieveThemSorted();
}
@ParameterizedTest
@EnumSource(
// running this test for tag storage modes other than INLINE mode, since INLINE mode replaces the tags and security labels
// on update rather than adding them to the existing set. The INLINE mode has its own test below.
value = JpaStorageSettings.TagStorageModeEnum.class,
names = {"INLINE"},
mode = EnumSource.Mode.EXCLUDE)
public void testUpdateResource_ShouldNotIncreaseVersionBecauseOfTagOrder_NonInlineModes(JpaStorageSettings.TagStorageModeEnum theTagStorageMode) {
myStorageSettings.setTagStorageMode(theTagStorageMode);
myTagTestCasesUtil.updateResourceWithExistingTagsButInDifferentOrderAndExpectVersionToRemainTheSame_NonInlineModes();
}
@Test
public void testUpdateResource_ShouldNotIncreaseVersionBecauseOfTagOrder_InlineMode() {
myStorageSettings.setTagStorageMode(JpaStorageSettings.TagStorageModeEnum.INLINE);
myTagTestCasesUtil.updateResourceWithExistingTagsButInDifferentOrderAndExpectVersionToRemainTheSame_InlineMode();
}
@ParameterizedTest
@EnumSource(
// running this test for tag storage modes other than INLINE mode, since INLINE mode replaces the tags and security labels
// on update rather than adding them to the existing set. The INLINE mode has its own test below.
value = JpaStorageSettings.TagStorageModeEnum.class,
names = {"INLINE"},
mode = EnumSource.Mode.EXCLUDE)
public void testUpdateResource_ExpectToRetrieveTagsSorted_NonInlineModes(JpaStorageSettings.TagStorageModeEnum theTagStorageMode) {
myStorageSettings.setTagStorageMode(theTagStorageMode);
myTagTestCasesUtil.updateResourceWithTagsAndExpectToRetrieveTagsSorted_NonInlineModes();
}
@Test
public void testUpdateResource_ExpectToRetrieveTagsSorted_InlineMode() {
myStorageSettings.setTagStorageMode(JpaStorageSettings.TagStorageModeEnum.INLINE);
// TODO: In inline mode, $meta endpoint doesn't return tags, see https://github.com/hapifhir/hapi-fhir/issues/5206
// When this issue is fixed, the following line could be removed so that we check $meta for Inline mode as well
myTagTestCasesUtil.setMetaOperationSupported(false);
Meta metaInputOnCreate = createMeta(
// generateAllCodingPairs creates a list that has 6 codings in this case in this order:
// (sys2, c), (sys2, b), (sys2, a), (sys1, c), (sys1, b), (sys1, a)
generateAllCodingPairs(List.of("sys2", "sys1"), List.of("c", "b", "a")), //tag
generateAllCodingPairs(List.of("sys2", "sys1"), List.of("c", "b", "a")), //security
List.of("c", "b", "a") // profile
);
// meta input for update (adding new tags)
Meta metaInputOnUpdate = createMeta(
generateAllCodingPairs(List.of("sys2", "sys1"), List.of("cc", "bb", "aa")), //tag
generateAllCodingPairs(List.of("sys2", "sys1"), List.of("cc", "bb", "aa")), //security
List.of("cc", "bb", "aa") //profile
);
// inline mode replaces the tags completely on update, so only new tags are expected after update
Meta expectedMetaAfterUpdate = createMeta(
generateAllCodingPairs(List.of("sys1", "sys2"), List.of("aa", "bb", "cc")), //tag (replaced & sorted)
generateAllCodingPairs(List.of("sys1", "sys2"), List.of("aa", "bb", "cc")), //security (replaced & sorted)
List.of("aa", "bb", "cc") //profile (replaced & sorted)
);
myTagTestCasesUtil.updateResourceAndVerifyMeta(metaInputOnCreate, metaInputOnUpdate, expectedMetaAfterUpdate, false);
}
}

View File

@ -50,6 +50,7 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.ClasspathUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
@ -814,7 +815,7 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
String name = "profiles-resources";
ourLog.info("Uploading " + name);
String vsContents;
vsContents = IOUtils.toString(FhirResourceDaoR4Test.class.getResourceAsStream("/org/hl7/fhir/r4/model/profile/" + name + ".xml"), StandardCharsets.UTF_8);
vsContents = ClasspathUtil.loadResource("/org/hl7/fhir/r4/model/profile/" + name + ".xml");
bundle = myFhirContext.newXmlParser().parseResource(org.hl7.fhir.r4.model.Bundle.class, vsContents);
for (BundleEntryComponent i : bundle.getEntry()) {

View File

@ -2333,7 +2333,7 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
@Test
public void testTransactionCreateWithPutUsingUrl2() throws Exception {
String req = IOUtils.toString(FhirSystemDaoR4Test.class.getResourceAsStream("/r4/bundle.xml"), StandardCharsets.UTF_8);
String req = ClasspathUtil.loadResource("/r4/bundle.xml");
Bundle request = myFhirContext.newXmlParser().parseResource(Bundle.class, req);
mySystemDao.transaction(mySrd, request);
}

View File

@ -237,6 +237,31 @@ public class GraphQLR4Test extends BaseResourceProviderR4Test {
myCaptureQueriesListener.logSelectQueries();
}
@Test
public void testId_Search_Patient() throws IOException {
initTestPatients();
String query = "{PatientList(_id: " + myPatientId0.getIdPart() + ") {id}}";
HttpGet httpGet = new HttpGet(myServerBase + "/$graphql?query=" + UrlUtil.escapeUrlParam(query));
try (CloseableHttpResponse response = ourHttpClient.execute(httpGet)) {
String resp = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
ourLog.info(resp);
@Language("json")
String expected = """
{
"PatientList":[{
"id":" """ + myPatientId0 + """
/_history/1"
}]
}""";
assertEquals(TestUtil.stripWhitespace(DATA_PREFIX +
expected +
DATA_SUFFIX), TestUtil.stripWhitespace(resp));
}
}
private void initTestPatients() {
Patient p = new Patient();
p.addName()

View File

@ -43,6 +43,8 @@ import ca.uhn.fhir.jpa.sp.SearchParamPresenceSvcImpl;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.ClasspathUtil;
import ca.uhn.fhir.util.IMetaTagSorter;
import ca.uhn.fhir.util.MetaTagSorterAlphabetical;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.validation.IInstanceValidatorModule;
import com.google.common.collect.Lists;
@ -149,6 +151,7 @@ public class GiantTransactionPerfTest {
private IIdHelperService myIdHelperService;
@Mock
private IJpaStorageResourceParser myJpaStorageResourceParser;
private IMetaTagSorter myMetaTagSorter;
@AfterEach
public void afterEach() {
@ -175,6 +178,8 @@ public class GiantTransactionPerfTest {
myPartitionSettings = new PartitionSettings();
myMetaTagSorter = new MetaTagSorterAlphabetical();
myHapiTransactionService = new HapiTransactionService();
myHapiTransactionService.setTransactionManager(myTransactionManager);
myHapiTransactionService.setInterceptorBroadcaster(myInterceptorSvc);
@ -267,6 +272,7 @@ public class GiantTransactionPerfTest {
myEobDao.setPartitionSettingsForUnitTest(myPartitionSettings);
myEobDao.setJpaStorageResourceParserForUnitTest(myJpaStorageResourceParser);
myEobDao.setExternallyStoredResourceServiceRegistryForUnitTest(new ExternallyStoredResourceServiceRegistry());
myEobDao.setMyMetaTagSorter(myMetaTagSorter);
myEobDao.start();
myDaoRegistry.setResourceDaos(Lists.newArrayList(myEobDao));

View File

@ -2,12 +2,14 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import ca.uhn.fhir.test.utilities.server.HashMapResourceProviderExtension;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import ca.uhn.fhir.test.utilities.server.TransactionCapturingProviderExtension;
@ -61,7 +63,11 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
@Autowired
protected SubscriptionTestUtil mySubscriptionTestUtil;
@Autowired
protected SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
protected ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;
@Autowired
protected IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
@Autowired
protected IResourceModifiedDao myResourceModifiedDao;
protected CountingInterceptor myCountingInterceptor;
protected List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
@Autowired
@ -84,6 +90,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
myStorageSettings.setAllowMultipleDelete(new JpaStorageSettings().isAllowMultipleDelete());
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
myResourceModifiedDao.deleteAll();
}
@BeforeEach
@ -102,7 +109,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
waitForActivatedSubscriptionCount(0);
}
LinkedBlockingChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
LinkedBlockingChannel processingChannel = (LinkedBlockingChannel) myResourceModifiedSubmitterSvc.getProcessingChannelForUnitTest();
if (processingChannel != null) {
processingChannel.clearInterceptorsForUnitTest();
}

View File

@ -0,0 +1,158 @@
package ca.uhn.fhir.jpa.subscription.async;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SynchronousSubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.message.TestQueueConsumerHandler;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ContextConfiguration;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ContextConfiguration(classes = {AsyncSubscriptionMessageSubmissionIT.SpringConfig.class})
public class AsyncSubscriptionMessageSubmissionIT extends BaseSubscriptionsR4Test {
@SpyBean
IResourceModifiedConsumer myResourceModifiedConsumer;
@Autowired
AsyncResourceModifiedSubmitterSvc myAsyncResourceModifiedSubmitterSvc;
@Autowired
private SubscriptionChannelFactory myChannelFactory;
@Autowired SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@Autowired
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
private TestQueueConsumerHandler<ResourceModifiedJsonMessage> myQueueConsumerHandler;
@AfterEach
public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null);
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
myStorageSettings.setTriggerSubscriptionsForNonVersioningChanges(new JpaStorageSettings().isTriggerSubscriptionsForNonVersioningChanges());
myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode());
}
@BeforeEach
public void beforeRegisterRestHookListenerAndSchedulePoisonPillInterceptor() {
mySubscriptionTestUtil.registerMessageInterceptor();
IChannelReceiver receiver = myChannelFactory.newMatchingReceivingChannel("my-queue-name", new ChannelConsumerSettings());
myQueueConsumerHandler = new TestQueueConsumerHandler();
receiver.subscribe(myQueueConsumerHandler);
myStorageSettings.setTagStorageMode(JpaStorageSettings.TagStorageModeEnum.NON_VERSIONED);
}
@Test
public void testSpringInjects_BeanOfTypeSubscriptionMatchingInterceptor_whenBeanDeclarationIsOverwrittenLocally(){
assertFalse(mySubscriptionMatcherInterceptor instanceof SynchronousSubscriptionMatcherInterceptor);
}
@Test
// the purpose of this test is to assert that a resource matching a given subscription is
// delivered asynchronously to the subscription processing pipeline.
public void testAsynchronousDeliveryOfResourceMatchingASubscription_willSucceed() throws Exception {
String aCode = "zoop";
String aSystem = "SNOMED-CT";
// given
createAndSubmitSubscriptionWithCriteria("[Observation]");
waitForActivatedSubscriptionCount(1);
// when
Observation obs = sendObservation(aCode, aSystem);
assertCountOfResourcesNeedingSubmission(2); // the subscription and the observation
assertCountOfResourcesReceivedAtSubscriptionTerminalEndpoint(0);
// since scheduled tasks are disabled during tests, let's trigger a submission
// just like the AsyncResourceModifiedProcessingSchedulerSvc would.
myAsyncResourceModifiedSubmitterSvc.runDeliveryPass();
//then
waitForQueueToDrain();
assertCountOfResourcesNeedingSubmission(0);
assertCountOfResourcesReceivedAtSubscriptionTerminalEndpoint(1);
Observation observation = (Observation) fetchSingleResourceFromSubscriptionTerminalEndpoint();
Coding coding = observation.getCode().getCodingFirstRep();
assertThat(coding.getCode(), equalTo(aCode));
assertThat(coding.getSystem(), equalTo(aSystem));
}
private void assertCountOfResourcesNeedingSubmission(int theExpectedCount) {
assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(), hasSize(theExpectedCount));
}
private Subscription createAndSubmitSubscriptionWithCriteria(String theCriteria) {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
subscription.setCriteria(theCriteria);
Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
channel.setType(Subscription.SubscriptionChannelType.MESSAGE);
channel.setPayload("application/fhir+json");
channel.setEndpoint("channel:my-queue-name");
subscription.setChannel(channel);
postOrPutSubscription(subscription);
myAsyncResourceModifiedSubmitterSvc.runDeliveryPass();
return subscription;
}
private IBaseResource fetchSingleResourceFromSubscriptionTerminalEndpoint() {
assertThat(myQueueConsumerHandler.getMessages().size(), is(equalTo(1)));
ResourceModifiedJsonMessage resourceModifiedJsonMessage = myQueueConsumerHandler.getMessages().get(0);
ResourceModifiedMessage payload = resourceModifiedJsonMessage.getPayload();
String payloadString = payload.getPayloadString();
IBaseResource resource = myFhirContext.newJsonParser().parseResource(payloadString);
myQueueConsumerHandler.clearMessages();
return resource;
}
private void assertCountOfResourcesReceivedAtSubscriptionTerminalEndpoint(int expectedCount) {
assertThat(myQueueConsumerHandler.getMessages(), hasSize(expectedCount));
}
@Configuration
public static class SpringConfig {
@Primary
@Bean
public SubscriptionMatcherInterceptor subscriptionMatcherInterceptor() {
return new SubscriptionMatcherInterceptor();
}
}
}

View File

@ -1,6 +1,11 @@
package ca.uhn.fhir.jpa.subscription.message;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
@ -11,20 +16,28 @@ import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscrib
import ca.uhn.fhir.rest.client.api.Header;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.AdditionalRequestHeadersInterceptor;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.List;
import java.util.stream.Collectors;
@ -49,6 +62,12 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(MessageSubscriptionR4Test.class);
private TestQueueConsumerHandler<ResourceModifiedJsonMessage> handler;
@Autowired
IResourceModifiedDao myResourceModifiedDao;
@Autowired
private PlatformTransactionManager myTxManager;
@Autowired
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
@ -176,6 +195,109 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
}
@Test
public void testMethodFindAllOrdered_willReturnAllPersistedResourceModifiedMessagesOrderedByCreatedTime(){
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
// given
Patient patient = sendPatient();
Organization organization = sendOrganization();
ResourceModifiedMessage patientResourceModifiedMessage = new ResourceModifiedMessage(myFhirContext, patient, BaseResourceMessage.OperationTypeEnum.CREATE);
ResourceModifiedMessage organizationResourceModifiedMessage = new ResourceModifiedMessage(myFhirContext, organization, BaseResourceMessage.OperationTypeEnum.CREATE);
IPersistedResourceModifiedMessage patientPersistedMessage = myResourceModifiedMessagePersistenceSvc.persist(patientResourceModifiedMessage);
IPersistedResourceModifiedMessage organizationPersistedMessage = myResourceModifiedMessagePersistenceSvc.persist(organizationResourceModifiedMessage);
// when
List<IPersistedResourceModifiedMessage> allPersisted = myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime();
// then
assertOnPksAndOrder(allPersisted, List.of(patientPersistedMessage, organizationPersistedMessage));
}
@Test
public void testMethodDeleteByPK_whenEntityExists_willDeleteTheEntityAndReturnTrue(){
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
// given
TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager);
Patient patient = sendPatient();
ResourceModifiedMessage patientResourceModifiedMessage = new ResourceModifiedMessage(myFhirContext, patient, BaseResourceMessage.OperationTypeEnum.CREATE);
IPersistedResourceModifiedMessage persistedResourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.persist(patientResourceModifiedMessage);
// when
boolean wasDeleted = transactionTemplate.execute(tx -> myResourceModifiedMessagePersistenceSvc.deleteByPK(persistedResourceModifiedMessage.getPersistedResourceModifiedMessagePk()));
// then
assertThat(wasDeleted, is(Boolean.TRUE));
assertThat(myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(), hasSize(0));
}
@Test
public void testMethodDeleteByPK_whenEntityDoesNotExist_willReturnFalse(){
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
// given
TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager);
IPersistedResourceModifiedMessagePK nonExistentResourceWithPk = PersistedResourceModifiedMessageEntityPK.with("one", "one");
// when
boolean wasDeleted = transactionTemplate.execute(tx -> myResourceModifiedMessagePersistenceSvc.deleteByPK(nonExistentResourceWithPk));
// then
assertThat(wasDeleted, is(Boolean.FALSE));
}
@Test
public void testPersistedResourceModifiedMessage_whenFetchFromDb_willEqualOriginalMessage() throws JsonProcessingException {
mySubscriptionTestUtil.unregisterSubscriptionInterceptor();
// given
TransactionTemplate transactionTemplate = new TransactionTemplate(myTxManager);
Observation obs = sendObservation("zoop", "SNOMED-CT", "theExplicitSource", "theRequestId");
ResourceModifiedMessage originalResourceModifiedMessage = createResourceModifiedMessage(obs);
transactionTemplate.execute(tx -> {
IPersistedResourceModifiedMessage persistedResourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.persist(originalResourceModifiedMessage);
// when
ResourceModifiedMessage restoredResourceModifiedMessage = myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(persistedResourceModifiedMessage);
// then
assertEquals(toJson(originalResourceModifiedMessage), toJson(restoredResourceModifiedMessage));
assertEquals(originalResourceModifiedMessage, restoredResourceModifiedMessage);
return null;
});
}
private ResourceModifiedMessage createResourceModifiedMessage(Observation theObservation){
ResourceModifiedMessage retVal = new ResourceModifiedMessage(myFhirContext, theObservation, BaseResourceMessage.OperationTypeEnum.CREATE);
retVal.setSubscriptionId("subId");
retVal.setTransactionId("txId");
retVal.setMessageKey("messageKey");
retVal.setMediaType("json");
retVal.setAttribute("attKey", "attValue");
retVal.setPartitionId(RequestPartitionId.allPartitions());
return retVal;
}
private static void assertEquals(ResourceModifiedMessage theMsg, ResourceModifiedMessage theComparedTo){
assertThat(theMsg.getPayloadId(), equalTo(theComparedTo.getPayloadId()));
assertThat(theMsg.getOperationType(), equalTo(theComparedTo.getOperationType()));
assertThat(theMsg.getPayloadString(), equalTo(theComparedTo.getPayloadString()));
assertThat(theMsg.getSubscriptionId(), equalTo(theComparedTo.getSubscriptionId()));
assertThat(theMsg.getMediaType(), equalTo(theComparedTo.getMediaType()));
assertThat(theMsg.getMessageKeyOrNull(), equalTo(theComparedTo.getMessageKeyOrNull()));
assertThat(theMsg.getTransactionId(), equalTo(theComparedTo.getTransactionId()));
assertThat(theMsg.getAttributes(), equalTo(theComparedTo.getAttributes()));
}
private void maybeAddHeaderInterceptor(IGenericClient theClient, List<Header> theHeaders) {
if(theHeaders.isEmpty()){
return;
@ -215,4 +337,32 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
return (T) resource;
}
private static void assertEquals(String theMsg, String theComparedTo){
assertThat(theMsg, equalTo(theComparedTo));
}
private static String toJson(Object theRequest) {
try {
return new ObjectMapper().writer().writeValueAsString(theRequest);
} catch (JsonProcessingException theE) {
throw new AssertionError("Failure during serialization: " + theE);
}
}
private static void assertOnPksAndOrder(List<IPersistedResourceModifiedMessage> theFetchedResourceModifiedMessageList, List<IPersistedResourceModifiedMessage> theCompareToList ){
assertThat(theFetchedResourceModifiedMessageList, hasSize(theCompareToList.size()));
List<IPersistedResourceModifiedMessagePK> fetchedPks = theFetchedResourceModifiedMessageList
.stream()
.map(IPersistedResourceModifiedMessage::getPersistedResourceModifiedMessagePk)
.collect(Collectors.toList());
List<IPersistedResourceModifiedMessagePK> compareToPks = theCompareToList
.stream()
.map(IPersistedResourceModifiedMessage::getPersistedResourceModifiedMessagePk)
.collect(Collectors.toList());
Assertions.assertEquals(fetchedPks, compareToPks);
}
}

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update;
@ -52,7 +52,7 @@ public class RestHookActivatesPreExistingSubscriptionsR4Test extends BaseResourc
@Autowired
private SubscriptionTestUtil mySubscriptionTestUtil;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
private ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;
@AfterEach
public void afterUnregisterRestHookListener() {
@ -63,7 +63,7 @@ public class RestHookActivatesPreExistingSubscriptionsR4Test extends BaseResourc
@BeforeEach
public void beforeSetSubscriptionActivatingInterceptor() {
myStorageSettings.addSupportedSubscriptionType(org.hl7.fhir.dstu2.model.Subscription.SubscriptionChannelType.RESTHOOK);
mySubscriptionMatcherInterceptor.startIfNeeded();
myResourceModifiedSubmitterSvc.startIfNeeded();
mySubscriptionLoader.doSyncSubscriptionsForUnitTest();
}

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicDispatcher;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry;
@ -31,6 +32,7 @@ import org.hl7.fhir.r4.model.SearchParameter;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -64,6 +66,9 @@ import static org.junit.jupiter.api.Assertions.fail;
public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(RestHookTestR4Test.class);
@Autowired
ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;
@Autowired
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
@Autowired(required = false)
@ -113,7 +118,6 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
assertEquals("IN_MEMORY", subscription.getMeta().getTag().get(0).getCode());
}
@Test
public void testRestHookSubscriptionApplicationFhirJson() throws Exception {
String payload = "application/fhir+json";

View File

@ -0,0 +1,141 @@
package ca.uhn.fhir.jpa.subscription.svc;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.svc.MockHapiTransactionService;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.SimpleTransactionStatus;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class ResourceModifiedSubmitterSvcTest {
@Mock
StorageSettings myStorageSettings;
@Mock
SubscriptionChannelFactory mySubscriptionChannelFactory;
@Mock
IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc;
@Captor
ArgumentCaptor<ChannelProducerSettings> myArgumentCaptor;
@Mock
IChannelProducer myChannelProducer;
ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;
TransactionStatus myCapturingTransactionStatus;
@BeforeEach
public void beforeEach(){
myCapturingTransactionStatus = new SimpleTransactionStatus();
lenient().when(myStorageSettings.hasSupportedSubscriptionTypes()).thenReturn(true);
lenient().when(mySubscriptionChannelFactory.newMatchingSendingChannel(anyString(), any())).thenReturn(myChannelProducer);
IHapiTransactionService hapiTransactionService = new MockHapiTransactionService(myCapturingTransactionStatus);
myResourceModifiedSubmitterSvc = new ResourceModifiedSubmitterSvc(
myStorageSettings,
mySubscriptionChannelFactory,
myResourceModifiedMessagePersistenceSvc,
hapiTransactionService);
}
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testMethodStartIfNeeded_withQualifySubscriptionMatchingChannelNameProperty_mayQualifyChannelName(boolean theIsQualifySubMatchingChannelName){
// given
boolean expectedResult = theIsQualifySubMatchingChannelName;
when(myStorageSettings.isQualifySubscriptionMatchingChannelName()).thenReturn(theIsQualifySubMatchingChannelName);
// when
myResourceModifiedSubmitterSvc.startIfNeeded();
// then
ChannelProducerSettings capturedChannelProducerSettings = getCapturedChannelProducerSettings();
assertThat(capturedChannelProducerSettings.isQualifyChannelName(), is(expectedResult));
}
@Test
public void testSubmitPersisedResourceModifiedMessage_withExistingPersistedResourceModifiedMessage_willSucceed(){
// given
// a successful deletion implies that the message did exist.
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true);
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage());
// when
boolean wasProcessed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(new ResourceModifiedEntity());
// then
assertThat(wasProcessed, is(Boolean.TRUE));
assertThat(myCapturingTransactionStatus.isRollbackOnly(), is(Boolean.FALSE));
verify(myChannelProducer, times(1)).send(any());
}
@Test
public void testSubmitPersisedResourceModifiedMessage_whenMessageWasAlreadyProcess_willSucceed(){
// given
// deletion fails, someone else was faster and processed the message
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(false);
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage());
// when
boolean wasProcessed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(new ResourceModifiedEntity());
// then
assertThat(wasProcessed, is(Boolean.TRUE));
assertThat(myCapturingTransactionStatus.isRollbackOnly(), is(Boolean.FALSE));
// we do not send a message which was already sent
verify(myChannelProducer, times(0)).send(any());
}
@Test
public void testSubmitPersisedResourceModifiedMessage_whitErrorOnSending_willRollbackDeletion(){
// given
when(myResourceModifiedMessagePersistenceSvc.deleteByPK(any())).thenReturn(true);
when(myResourceModifiedMessagePersistenceSvc.inflatePersistedResourceModifiedMessage(any())).thenReturn(new ResourceModifiedMessage());
// simulate failure writing to the channel
when(myChannelProducer.send(any())).thenThrow(new MessageDeliveryException("sendingError"));
// when
boolean wasProcessed = myResourceModifiedSubmitterSvc.submitPersisedResourceModifiedMessage(new ResourceModifiedEntity());
// then
assertThat(wasProcessed, is(Boolean.FALSE));
assertThat(myCapturingTransactionStatus.isRollbackOnly(), is(Boolean.TRUE));
}
private ChannelProducerSettings getCapturedChannelProducerSettings(){
verify(mySubscriptionChannelFactory).newMatchingSendingChannel(anyString(), myArgumentCaptor.capture());
return myArgumentCaptor.getValue();
}
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.provider.r4b.BaseResourceProviderR4BTest;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
@ -63,7 +63,7 @@ public abstract class BaseSubscriptionsR4BTest extends BaseResourceProviderR4BTe
@Autowired
protected SubscriptionTestUtil mySubscriptionTestUtil;
@Autowired
protected SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
protected ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;
protected CountingInterceptor myCountingInterceptor;
protected List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
@Autowired
@ -104,12 +104,12 @@ public abstract class BaseSubscriptionsR4BTest extends BaseResourceProviderR4BTe
waitForActivatedSubscriptionCount(0);
}
LinkedBlockingChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
myCountingInterceptor = new CountingInterceptor();
LinkedBlockingChannel processingChannel = (LinkedBlockingChannel) myResourceModifiedSubmitterSvc.getProcessingChannelForUnitTest();
if (processingChannel != null) {
processingChannel.clearInterceptorsForUnitTest();
}
myCountingInterceptor = new CountingInterceptor();
if (processingChannel != null) {
processingChannel.addInterceptor(myCountingInterceptor);
}
}

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.CacheControlDirective;
@ -25,6 +26,7 @@ import org.hl7.fhir.r4b.model.SearchParameter;
import org.hl7.fhir.r4b.model.StringType;
import org.hl7.fhir.r4b.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -56,6 +58,9 @@ import static org.junit.jupiter.api.Assertions.fail;
public class RestHookTestR4BTest extends BaseSubscriptionsR4BTest {
private static final Logger ourLog = LoggerFactory.getLogger(RestHookTestR4BTest.class);
@Autowired
ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;
@Autowired
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -12,7 +12,7 @@ import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.jpa.subscription.model.CanonicalTopicSubscriptionFilter;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicLoader;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicRegistry;
@ -73,7 +73,7 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
@Autowired
protected SubscriptionTestUtil mySubscriptionTestUtil;
@Autowired
protected SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
protected ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;
protected CountingInterceptor myCountingInterceptor;
protected List<IIdType> mySubscriptionIds = Collections.synchronizedList(new ArrayList<>());
@Autowired
@ -110,7 +110,7 @@ public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test
waitForActivatedSubscriptionCount(0);
}
LinkedBlockingChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
LinkedBlockingChannel processingChannel = (LinkedBlockingChannel) myResourceModifiedSubmitterSvc.getProcessingChannelForUnitTest();
if (processingChannel != null) {
processingChannel.clearInterceptorsForUnitTest();
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.9.3-SNAPSHOT</version>
<version>6.9.4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -17,9 +17,10 @@
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.search;
package ca.uhn.fhir.jpa.svc;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.SimpleTransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
@ -27,9 +28,19 @@ import javax.annotation.Nullable;
public class MockHapiTransactionService extends HapiTransactionService {
private TransactionStatus myTransactionStatus;
public MockHapiTransactionService() {
this(new SimpleTransactionStatus());
}
public MockHapiTransactionService(TransactionStatus theTransactionStatus) {
myTransactionStatus = theTransactionStatus;
}
@Nullable
@Override
protected <T> T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback<T> theCallback) {
return theCallback.doInTransaction(new SimpleTransactionStatus());
return theCallback.doInTransaction(myTransactionStatus);
}
}

View File

@ -110,7 +110,7 @@ public class Batch2JobHelper {
} catch (ConditionTimeoutException e) {
String statuses = myJobPersistence.fetchInstances(100, 0)
.stream()
.map(t -> t.getJobDefinitionId() + "/" + t.getStatus().name())
.map(t -> t.getInstanceId() + " " + t.getJobDefinitionId() + "/" + t.getStatus().name())
.collect(Collectors.joining("\n"));
String currentStatus = myJobCoordinator.getInstance(theBatchJobId).getStatus().name();
fail("Job " + theBatchJobId + " still has status " + currentStatus + " - All statuses:\n" + statuses);

View File

@ -25,10 +25,16 @@ import ca.uhn.fhir.jpa.batch2.JpaBatch2Config;
import ca.uhn.fhir.jpa.config.HapiJpaConfig;
import ca.uhn.fhir.jpa.config.JpaDstu2Config;
import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import ca.uhn.fhir.system.HapiTestSystemProperties;
import ca.uhn.fhir.validation.IInstanceValidatorModule;
import ca.uhn.fhir.validation.ResultSeverityEnum;
@ -205,4 +211,5 @@ public class TestDstu2Config {
return requestValidator;
}
}

View File

@ -26,15 +26,21 @@ import ca.uhn.fhir.jpa.config.HapiJpaConfig;
import ca.uhn.fhir.jpa.config.PackageLoaderConfig;
import ca.uhn.fhir.jpa.config.dstu3.JpaDstu3Config;
import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.EmailSenderImpl;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.IEmailSender;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.rest.server.mail.IMailSvc;
import ca.uhn.fhir.rest.server.mail.MailConfig;
import ca.uhn.fhir.rest.server.mail.MailSvc;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import ca.uhn.fhir.system.HapiTestSystemProperties;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
@ -220,5 +226,4 @@ public class TestDstu3Config {
return new PropertySourcesPlaceholderConfigurer();
}
}

View File

@ -27,11 +27,17 @@ import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
import ca.uhn.fhir.jpa.config.HapiJpaConfig;
import ca.uhn.fhir.jpa.config.r4b.JpaR4BConfig;
import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.topic.SubscriptionTopicConfig;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import ca.uhn.fhir.system.HapiTestSystemProperties;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;

View File

@ -28,12 +28,18 @@ import ca.uhn.fhir.jpa.config.HapiJpaConfig;
import ca.uhn.fhir.jpa.config.PackageLoaderConfig;
import ca.uhn.fhir.jpa.config.r4.JpaR4Config;
import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.searchparam.config.NicknameServiceConfig;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
import ca.uhn.fhir.jpa.util.CurrentThreadCaptureQueriesListener;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
import ca.uhn.fhir.system.HapiTestSystemProperties;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;

View File

@ -0,0 +1,48 @@
/*-
* #%L
* HAPI FHIR JPA Server Test Utilities
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.test.config;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SynchronousSubscriptionMatcherInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* Production environments submit modified resources to the subscription processing pipeline asynchronously, ie, a
* modified resource is 'planned' for submission which is performed at a later time by a scheduled task.
*
* The purpose of this class is to provide submission of modified resources during tests since task scheduling required
* for asynchronous submission are either disabled or not present in testing context.
*
* Careful consideration is advised when configuring test context as the SubscriptionMatcherInterceptor Bean instantiated
* below will overwrite the Bean provided by class SubscriptionMatcherInterceptorConfig if both configuration classes
* are present in the context.
*/
@Configuration
public class TestSubscriptionMatcherInterceptorConfig {
@Primary
@Bean
public SubscriptionMatcherInterceptor subscriptionMatcherInterceptor() {
return new SynchronousSubscriptionMatcherInterceptor();
}
}

View File

@ -29,8 +29,8 @@ import ca.uhn.fhir.jpa.subscription.match.deliver.email.EmailSenderImpl;
import ca.uhn.fhir.jpa.subscription.match.deliver.email.SubscriptionDeliveringEmailSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionSubmitInterceptorLoader;
import ca.uhn.fhir.jpa.subscription.submit.svc.ResourceModifiedSubmitterSvc;
import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor;
import org.hl7.fhir.dstu2.model.Subscription;
import org.hl7.fhir.instance.model.api.IIdType;
@ -45,7 +45,7 @@ public class SubscriptionTestUtil {
@Autowired
private SubscriptionSubmitInterceptorLoader mySubscriptionSubmitInterceptorLoader;
@Autowired
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
private ResourceModifiedSubmitterSvc myResourceModifiedSubmitterSvc;
@Autowired
private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
@ -56,7 +56,7 @@ public class SubscriptionTestUtil {
private IInterceptorService myInterceptorRegistry;
public int getExecutorQueueSize() {
LinkedBlockingChannel channel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest();
LinkedBlockingChannel channel = (LinkedBlockingChannel) myResourceModifiedSubmitterSvc.getProcessingChannelForUnitTest();
return channel.getQueueSizeForUnitTest();
}

Some files were not shown because too many files have changed in this diff Show More