2480 partition aware subscription (#3218)

* some fixmes to start

* Added some FIXMEs

* added RequestPartitionId to reosurceDelivryMessage and ResourceModifiedMessage

* ResourceDeliveryMessage and ResourceModifiedMessage tests

* fixed issue with test missing partitionHelperSvc mocked bean

* Added tests and implemented SubscriptionMatchingSubscriber for partition aware subscription

* modified implementation of partitionId in CanonicalSubscription

* Moved PartitionablePartitionId, and refactored all calls to getUserData(Constants.RESOURCE_PARTITION_ID) and setUserData(Constants.RESOURCE_PARTITION_ID)

* Revert "Moved PartitionablePartitionId, and refactored all calls to getUserData(Constants.RESOURCE_PARTITION_ID) and setUserData(Constants.RESOURCE_PARTITION_ID)"

This reverts commit fe40fb9733.

* Got added partitionId to subscriptions, added changes to make SubscriptionMatchingSubscriberTest work

* added SubscriptionTriggering test, also added partition support to subscriptionLoader

* Changed implementation for storing partition id of subscriptions from messages, refactored tests to new implementation

* added all subscription systemRequestDetails with all partition to subscription reader

* refactored a generic system request details with default all partition request

* Added test for dao subscriptions, fixes to get the test working

* added partition support for latest version delivery

* added doc changes and changelog for multitenancy subscription

* cleanup and added partitioned subscription manually trigger test

* fixed mocked subscriptionDao

* added package-info for subscription module

* some code review changes

* removed AllPartitionSystemRequestDetail, added new text for multitenant subscription

* renamed method for code review

* version bump to 5.7.0PRE7

Co-authored-by: Michael Buckley <michael.buckley@smilecdr.com>
Co-authored-by: Ken Stevens <khstevens@gmail.com>
Co-authored-by: Long Ma <long@smilecdr.com>
Co-authored-by: Steven Li <steven@smilecdr.com>
This commit is contained in:
longma1 2021-12-13 08:45:30 -07:00 committed by GitHub
parent d6e543a478
commit a56603daa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
87 changed files with 782 additions and 170 deletions

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -10,7 +10,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId> <artifactId>hapi-fhir-cli</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath> <relativePath>../../hapi-deployable-pom</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -0,0 +1,4 @@
---
type: add
issue: 2480
title: "Added partition support for subscriptions. Subscriptions will now only match resource from the same partition"

View File

@ -95,13 +95,13 @@ The following resource types may not be placed in any partition except the defau
* CodeSystem * CodeSystem
* CompartmentDefinition * CompartmentDefinition
* ConceptMap * ConceptMap
* Library
* NamingSystem * NamingSystem
* OperationDefinition * OperationDefinition
* Questionnaire * Questionnaire
* SearchParameter * SearchParameter
* StructureDefinition * StructureDefinition
* StructureMap * StructureMap
* Subscription
* ValueSet * ValueSet
## Examples ## Examples
@ -150,19 +150,17 @@ None of the limitations listed here are considered permanent. Over time the HAPI
* CodeSystem * CodeSystem
* CompartmentDefinition * CompartmentDefinition
* ConceptMap * ConceptMap
* Library
* NamingSystem * NamingSystem
* OperationDefinition * OperationDefinition
* Questionnaire * Questionnaire
* SearchParameter * SearchParameter
* StructureDefinition * StructureDefinition
* StructureMap * StructureMap
* Subscription
* ValueSet * ValueSet
* **Server Capability Statement is not partition aware**: The server creates and exposes a single server capability statement, covering all partitions. This can be misleading when partitioning us used as a multitenancy strategy. * **Server Capability Statement is not partition aware**: The server creates and exposes a single server capability statement, covering all partitions. This can be misleading when partitioning us used as a multitenancy strategy.
* **Subscriptions may not be partitioned**: All subscriptions must be placed in the default partition, and subscribers will receive deliveries for any matching resources from all partitions.
* **Conformance resources may not be partitioned**: Conformance resources must be placed in the default partition, and will be shared for any validation activities across all partitions. * **Conformance resources may not be partitioned**: Conformance resources must be placed in the default partition, and will be shared for any validation activities across all partitions.
* **Search Parameters are not partitioned**: There is only one set of SearchParameter resources for the entire system, and any search parameters will apply to resources in all partitions. All SearchParameter resources must be stored in the default partition. * **Search Parameters are not partitioned**: There is only one set of SearchParameter resources for the entire system, and any search parameters will apply to resources in all partitions. All SearchParameter resources must be stored in the default partition.
@ -174,3 +172,5 @@ None of the limitations listed here are considered permanent. Over time the HAPI
* **Package Operations are not partition aware**: Package operations will only create, update and query resources in the default partition. * **Package Operations are not partition aware**: Package operations will only create, update and query resources in the default partition.
* **Advanced Elasticsearch indexing is not partition optimized**: The results are correctly partitioned, but the extended indexing is not optimized to account for partitions. * **Advanced Elasticsearch indexing is not partition optimized**: The results are correctly partitioned, but the extended indexing is not optimized to account for partitions.
* **Subscriptions are partition aware**: Subscriptions can be placed on any partition and will deliver matching resources from the same partition.

View File

@ -11,7 +11,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -75,7 +75,6 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
myNonPartitionableResourceNames = new HashSet<>(); myNonPartitionableResourceNames = new HashSet<>();
// Infrastructure // Infrastructure
myNonPartitionableResourceNames.add("Subscription");
myNonPartitionableResourceNames.add("SearchParameter"); myNonPartitionableResourceNames.add("SearchParameter");
// Validation and Conformance // Validation and Conformance
@ -85,6 +84,8 @@ public class RequestPartitionHelperSvc implements IRequestPartitionHelperSvc {
myNonPartitionableResourceNames.add("CompartmentDefinition"); myNonPartitionableResourceNames.add("CompartmentDefinition");
myNonPartitionableResourceNames.add("OperationDefinition"); myNonPartitionableResourceNames.add("OperationDefinition");
myNonPartitionableResourceNames.add("Library");
// Terminology // Terminology
myNonPartitionableResourceNames.add("ConceptMap"); myNonPartitionableResourceNames.add("ConceptMap");
myNonPartitionableResourceNames.add("CodeSystem"); myNonPartitionableResourceNames.add("CodeSystem");

View File

@ -200,7 +200,7 @@ public abstract class BasePartitioningR4Test extends BaseJpaR4SystemTest {
} }
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ) @Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId PartitionIdentifyRead(ServletRequestDetails theRequestDetails) { public RequestPartitionId partitionIdentifyRead(ServletRequestDetails theRequestDetails) {
RequestPartitionId retVal = myReadRequestPartitionIds.remove(0); RequestPartitionId retVal = myReadRequestPartitionIds.remove(0);
ourLog.info("Returning partition for read: {}", retVal); ourLog.info("Returning partition for read: {}", retVal);
return retVal; return retVal;

View File

@ -0,0 +1,212 @@
package ca.uhn.fhir.jpa.partition;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.dao.r4.BasePartitioningR4Test;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.resthook.RestHookTestR4Test;
import ca.uhn.fhir.jpa.subscription.triggering.ISubscriptionTriggeringSvc;
import ca.uhn.fhir.rest.api.Constants;
import org.awaitility.core.ConditionTimeoutException;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.ServletException;
import java.time.LocalDate;
import java.time.Month;
import java.util.ArrayList;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
public class PartitionedSubscriptionTriggeringR4Test extends BaseSubscriptionsR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(RestHookTestR4Test.class);
@Autowired
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
@Autowired
private ISubscriptionTriggeringSvc mySubscriptionTriggeringSvc;
static final String PARTITION_1 = "PART-1";
static final String PARTITION_2 = "PART-2";
protected MyReadWriteInterceptor myPartitionInterceptor;
protected LocalDate myPartitionDate;
protected LocalDate myPartitionDate2;
protected int myPartitionId;
protected int myPartitionId2;
@BeforeEach
public void beforeEach() throws ServletException {
myPartitionSettings.setPartitioningEnabled(true);
myPartitionSettings.setIncludePartitionInSearchHashes(new PartitionSettings().isIncludePartitionInSearchHashes());
myDaoConfig.setUniqueIndexesEnabled(true);
myModelConfig.setDefaultSearchParamsCanBeOverridden(true);
myPartitionDate = LocalDate.of(2020, Month.JANUARY, 14);
myPartitionDate2 = LocalDate.of(2020, Month.JANUARY, 15);
myPartitionId = 1;
myPartitionId2 = 2;
myPartitionInterceptor = new MyReadWriteInterceptor();
myPartitionInterceptor.setResultPartitionId(RequestPartitionId.fromPartitionNames(PARTITION_1));
mySrdInterceptorService.registerInterceptor(myPartitionInterceptor);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName(PARTITION_1));
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName(PARTITION_2));
myDaoConfig.setIndexMissingFields(DaoConfig.IndexEnabledEnum.ENABLED);
}
@AfterEach
@Override
public void afterUnregisterRestHookListener() {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null);
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
myDaoConfig.setTriggerSubscriptionsForNonVersioningChanges(new DaoConfig().isTriggerSubscriptionsForNonVersioningChanges());
myDaoRegistry.getSystemDao().expunge(new ExpungeOptions().setExpungeEverything(true), null);
myPartitionSettings.setUnnamedPartitionMode(false);
mySrdInterceptorService.unregisterInterceptorsIf(t -> t instanceof BasePartitioningR4Test.MyReadWriteInterceptor);
myInterceptor = null;
}
@Test
public void testCreateSubscriptionInPartition() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
Subscription subscription = newSubscription(criteria1, payload);
assertEquals(mySrdInterceptorService.getAllRegisteredInterceptors().size(), 1);
myDaoRegistry.getResourceDao("Subscription").create(subscription, mySrd);
waitForActivatedSubscriptionCount(1);
Observation observation = createBaseObservation(code, "SNOMED-CT");
myDaoRegistry.getResourceDao("Observation").create(observation, mySrd);
// Should see 1 subscription notification
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
}
@Test
public void testCreateSubscriptionInPartitionAndResourceInDifferentPartition() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Patient?active=true";
Subscription subscription = newSubscription(criteria1, payload);
assertEquals(mySrdInterceptorService.getAllRegisteredInterceptors().size(), 1);
myDaoRegistry.getResourceDao("Subscription").create(subscription, mySrd);
waitForActivatedSubscriptionCount(1);
Patient patient = new Patient();
patient.setActive(true);
myDaoRegistry.getResourceDao("Patient").create(patient, new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.fromPartitionId(2)));
// Should see 0 subscription notification
waitForQueueToDrain();
assertEquals(0, ourPatientProvider.getCountCreate());
try {
// Should have 0 matching subscription, if we get 1 update count then the test fails
ourPatientProvider.waitForUpdateCount(1);
fail();
} catch (ConditionTimeoutException e) {
assertEquals(0, ourRestfulServer.getRequestContentTypes().size());
}
}
@Test
public void testManualTriggeredSubscriptionInPartition() throws Exception {
String payload = "application/fhir+json";
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
// Create the resource first
DaoMethodOutcome observationOutcome = myDaoRegistry.getResourceDao("Observation").create(createBaseObservation(code, "SNOMED-CT"), mySrd);
Observation observation = (Observation) observationOutcome.getResource();
// Create the subscription now
DaoMethodOutcome subscriptionOutcome = myDaoRegistry.getResourceDao("Subscription").create(newSubscription(criteria1, payload), mySrd);
assertEquals(mySrdInterceptorService.getAllRegisteredInterceptors().size(), 1);
Subscription subscription = (Subscription) subscriptionOutcome.getResource();
waitForActivatedSubscriptionCount(1);
ArrayList<IPrimitiveType<String>> resourceIdList = new ArrayList<>();
resourceIdList.add(observation.getIdElement());
Parameters resultParameters = (Parameters) mySubscriptionTriggeringSvc.triggerSubscription(resourceIdList, null, subscription.getIdElement());
waitForQueueToDrain();
assertEquals(0, ourObservationProvider.getCountCreate());
String responseValue = resultParameters.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
}
@Interceptor
public static class MyReadWriteInterceptor {
private RequestPartitionId myReadPartitionId;
public void setResultPartitionId(RequestPartitionId theRequestPartitionId) {
myReadPartitionId = theRequestPartitionId;
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId read() {
RequestPartitionId retVal = myReadPartitionId;
ourLog.info("Returning partition for read: {}", retVal);
return retVal;
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE)
public RequestPartitionId create() {
RequestPartitionId retVal = myReadPartitionId;
ourLog.info("Returning partition for write: {}", retVal);
return retVal;
}
}
}

View File

@ -142,20 +142,25 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
} }
protected Observation sendObservation(String code, String system) { protected Observation sendObservation(String theCode, String theSystem) {
Observation observation = createBaseObservation(theCode, theSystem);
IIdType id = myObservationDao.create(observation).getId();
observation.setId(id);
return observation;
}
protected Observation createBaseObservation(String theCode, String theSystem) {
Observation observation = new Observation(); Observation observation = new Observation();
CodeableConcept codeableConcept = new CodeableConcept(); CodeableConcept codeableConcept = new CodeableConcept();
observation.setCode(codeableConcept); observation.setCode(codeableConcept);
observation.getIdentifierFirstRep().setSystem("foo").setValue("1"); observation.getIdentifierFirstRep().setSystem("foo").setValue("1");
Coding coding = codeableConcept.addCoding(); Coding coding = codeableConcept.addCoding();
coding.setCode(code); coding.setCode(theCode);
coding.setSystem(system); coding.setSystem(theSystem);
observation.setStatus(Observation.ObservationStatus.FINAL); observation.setStatus(Observation.ObservationStatus.FINAL);
IIdType id = myObservationDao.create(observation).getId();
observation.setId(id);
return observation; return observation;
} }

View File

@ -7,7 +7,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -24,7 +24,7 @@ public class ProducingChannelParameters extends BaseChannelParameters {
/** /**
* Constructor * Constructor
* * <p>
* Producing channels are sending channels. They send data to topics/queues. * Producing channels are sending channels. They send data to topics/queues.
* *
* @param theChannelName * @param theChannelName

View File

@ -24,7 +24,7 @@ public class ReceivingChannelParameters extends BaseChannelParameters {
/** /**
* Constructor * Constructor
* * <p>
* Receiving channels are channels that receive data from topics/queues * Receiving channels are channels that receive data from topics/queues
* *
* @param theChannelName * @param theChannelName

View File

@ -23,8 +23,10 @@ package ca.uhn.fhir.jpa.subscription.match.deliver.resthook;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber; import ca.uhn.fhir.jpa.subscription.match.deliver.BaseSubscriptionDeliverySubscriber;
@ -162,10 +164,11 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
return operation; return operation;
} }
public IBaseResource getResource(IIdType payloadId) throws ResourceGoneException { public IBaseResource getResource(IIdType payloadId, RequestPartitionId thePartitionId) throws ResourceGoneException {
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(payloadId.getResourceType()); RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(payloadId.getResourceType());
SystemRequestDetails systemRequestDetails = new SystemRequestDetails().setRequestPartitionId(thePartitionId);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceDef.getImplementingClass()); IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceDef.getImplementingClass());
return dao.read(payloadId.toVersionless()); return dao.read(payloadId.toVersionless(), systemRequestDetails);
} }
@ -177,7 +180,7 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
try { try {
if (payloadId != null) { if (payloadId != null) {
payloadResource = getResource(payloadId.toVersionless()); payloadResource = getResource(payloadId.toVersionless(), theMsg.getRequestPartitionId());
} else { } else {
return null; return null;
} }

View File

@ -24,12 +24,15 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.entity.PartitionablePartitionId;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult; import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -37,7 +40,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
public class DaoSubscriptionMatcher implements ISubscriptionMatcher { public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
private Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class); private final Logger ourLog = LoggerFactory.getLogger(DaoSubscriptionMatcher.class);
@Autowired @Autowired
DaoRegistry myDaoRegistry; DaoRegistry myDaoRegistry;
@ -56,7 +59,7 @@ public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
// Run the subscriptions query and look for matches, add the id as part of the criteria to avoid getting matches of previous resources rather than the recent resource // Run the subscriptions query and look for matches, add the id as part of the criteria to avoid getting matches of previous resources rather than the recent resource
criteria += "&_id=" + id.toUnqualifiedVersionless().getValue(); criteria += "&_id=" + id.toUnqualifiedVersionless().getValue();
IBundleProvider results = performSearch(criteria); IBundleProvider results = performSearch(criteria, theSubscription);
ourLog.debug("Subscription check found {} results for query: {}", results.size(), criteria); ourLog.debug("Subscription check found {} results for query: {}", results.size(), criteria);
@ -66,7 +69,7 @@ public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
/** /**
* Search based on a query criteria * Search based on a query criteria
*/ */
private IBundleProvider performSearch(String theCriteria) { private IBundleProvider performSearch(String theCriteria, CanonicalSubscription theSubscription) {
IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao(); IFhirResourceDao<?> subscriptionDao = myDaoRegistry.getSubscriptionDao();
RuntimeResourceDefinition responseResourceDef = subscriptionDao.validateCriteriaAndReturnResourceDefinition(theCriteria); RuntimeResourceDefinition responseResourceDef = subscriptionDao.validateCriteriaAndReturnResourceDefinition(theCriteria);
SearchParameterMap responseCriteriaUrl = myMatchUrlService.translateMatchUrl(theCriteria, responseResourceDef); SearchParameterMap responseCriteriaUrl = myMatchUrlService.translateMatchUrl(theCriteria, responseResourceDef);
@ -74,7 +77,9 @@ public class DaoSubscriptionMatcher implements ISubscriptionMatcher {
IFhirResourceDao<? extends IBaseResource> responseDao = myDaoRegistry.getResourceDao(responseResourceDef.getImplementingClass()); IFhirResourceDao<? extends IBaseResource> responseDao = myDaoRegistry.getResourceDao(responseResourceDef.getImplementingClass());
responseCriteriaUrl.setLoadSynchronousUpTo(1); responseCriteriaUrl.setLoadSynchronousUpTo(1);
return responseDao.search(responseCriteriaUrl); PartitionablePartitionId partitionId = new PartitionablePartitionId(theSubscription.getRequestPartitionId(), null);
RequestDetails systemRequestDetails = new SystemRequestDetails().setRequestPartitionId(partitionId.toPartitionId());
return responseDao.search(responseCriteriaUrl, systemRequestDetails);
} }
} }

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
@ -49,7 +50,7 @@ import javax.annotation.Nonnull;
* Also validates criteria. If invalid, rejects the subscription without persisting the subscription. * Also validates criteria. If invalid, rejects the subscription without persisting the subscription.
*/ */
public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler { public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); private final Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
@Autowired @Autowired
private SubscriptionRegistry mySubscriptionRegistry; private SubscriptionRegistry mySubscriptionRegistry;
@Autowired @Autowired
@ -115,19 +116,21 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private boolean activateSubscription(final IBaseResource theSubscription) { private boolean activateSubscription(final IBaseResource theSubscription) {
IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao(); IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
IBaseResource subscription = subscriptionDao.read(theSubscription.getIdElement()); SystemRequestDetails srd = SystemRequestDetails.forAllPartition();
IBaseResource subscription = subscriptionDao.read(theSubscription.getIdElement(), SystemRequestDetails.forAllPartition());
subscription.setId(subscription.getIdElement().toVersionless()); subscription.setId(subscription.getIdElement().toVersionless());
ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS); ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS);
try { try {
SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS); SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS);
subscriptionDao.update(subscription); subscriptionDao.update(subscription, srd);
return true; return true;
} catch (final UnprocessableEntityException e) { } catch (final UnprocessableEntityException e) {
ourLog.info("Changing status of {} to ERROR", subscription.getIdElement()); ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
SubscriptionUtil.setStatus(myFhirContext, subscription, "error"); SubscriptionUtil.setStatus(myFhirContext, subscription, "error");
SubscriptionUtil.setReason(myFhirContext, subscription, e.getMessage()); SubscriptionUtil.setReason(myFhirContext, subscription, e.getMessage());
subscriptionDao.update(subscription); subscriptionDao.update(subscription, srd);
return false; return false;
} }
} }

View File

@ -52,7 +52,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
*/ */
public class SubscriptionMatchingSubscriber implements MessageHandler { public class SubscriptionMatchingSubscriber implements MessageHandler {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriber.class); private final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriber.class);
public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching"; public static final String SUBSCRIPTION_MATCHING_CHANNEL_NAME = "subscription-matching";
@Autowired @Autowired
@ -123,7 +123,12 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
boolean resourceMatched = false; boolean resourceMatched = false;
for (ActiveSubscription nextActiveSubscription : subscriptions) { for (ActiveSubscription nextActiveSubscription : subscriptions) {
// skip if the partitions don't match
CanonicalSubscription subscription = nextActiveSubscription.getSubscription();
if (subscription != null && subscription.getRequestPartitionId() != null && theMsg.getPartitionId() != null
&& !theMsg.getPartitionId().hasPartitionId(subscription.getRequestPartitionId())) {
continue;
}
String nextSubscriptionId = getId(nextActiveSubscription); String nextSubscriptionId = getId(nextActiveSubscription);
if (isNotBlank(theMsg.getSubscriptionId())) { if (isNotBlank(theMsg.getSubscriptionId())) {
@ -154,15 +159,15 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
} }
IBaseResource payload = theMsg.getNewPayload(myFhirContext); IBaseResource payload = theMsg.getNewPayload(myFhirContext);
CanonicalSubscription subscription = nextActiveSubscription.getSubscription();
EncodingEnum encoding = null; EncodingEnum encoding = null;
if (subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) { if (subscription != null && subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
encoding = EncodingEnum.forContentType(subscription.getPayloadString()); encoding = EncodingEnum.forContentType(subscription.getPayloadString());
} }
encoding = defaultIfNull(encoding, EncodingEnum.JSON); encoding = defaultIfNull(encoding, EncodingEnum.JSON);
ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage(); ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
deliveryMsg.setPartitionId(theMsg.getPartitionId());
deliveryMsg.setPayload(myFhirContext, payload, encoding); deliveryMsg.setPayload(myFhirContext, payload, encoding);
deliveryMsg.setSubscription(subscription); deliveryMsg.setSubscription(subscription);

View File

@ -21,10 +21,15 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
*/ */
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -42,13 +47,17 @@ import javax.annotation.Nonnull;
* Also validates criteria. If invalid, rejects the subscription without persisting the subscription. * Also validates criteria. If invalid, rejects the subscription without persisting the subscription.
*/ */
public class SubscriptionRegisteringSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler { public class SubscriptionRegisteringSubscriber extends BaseSubscriberForSubscriptionResources implements MessageHandler {
private Logger ourLog = LoggerFactory.getLogger(SubscriptionRegisteringSubscriber.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegisteringSubscriber.class);
@Autowired @Autowired
private FhirContext myFhirContext; private FhirContext myFhirContext;
@Autowired @Autowired
private SubscriptionRegistry mySubscriptionRegistry; private SubscriptionRegistry mySubscriptionRegistry;
@Autowired @Autowired
private SubscriptionCanonicalizer mySubscriptionCanonicalizer; private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
@Autowired
private PartitionSettings myPartitionSettings;
@Autowired
private DaoRegistry myDaoRegistry;
/** /**
* Constructor * Constructor
@ -77,9 +86,18 @@ public class SubscriptionRegisteringSubscriber extends BaseSubscriberForSubscrip
case CREATE: case CREATE:
case UPDATE: case UPDATE:
IBaseResource subscription = payload.getNewPayload(myFhirContext); IBaseResource subscription = payload.getNewPayload(myFhirContext);
IBaseResource subscriptionToRegister = subscription;
String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(subscription); String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(subscription);
// reading resource back from db in order to store partition id in the userdata of the resource for partitioned subscriptions
if (myPartitionSettings.isPartitioningEnabled()) {
IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
RequestDetails systemRequestDetails = new SystemRequestDetails().setRequestPartitionId(payload.getPartitionId());
subscriptionToRegister = subscriptionDao.read(subscription.getIdElement(), systemRequestDetails);
}
if ("active".equals(statusString)) { if ("active".equals(statusString)) {
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(payload.getNewPayload(myFhirContext)); mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscriptionToRegister);
} else { } else {
mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payload.getPayloadId(myFhirContext).getIdPart()); mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payload.getPayloadId(myFhirContext).getIdPart());
} }

View File

@ -27,13 +27,14 @@ import ca.uhn.fhir.jpa.cache.IResourceChangeListener;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.jpa.searchparam.retry.Retrier; import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber; import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.TokenOrListParam; import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam; import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
@ -75,6 +76,7 @@ public class SubscriptionLoader implements IResourceChangeListener {
private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
private SearchParameterMap mySearchParameterMap; private SearchParameterMap mySearchParameterMap;
private SystemRequestDetails mySystemRequestDetails;
/** /**
* Constructor * Constructor
@ -86,6 +88,8 @@ public class SubscriptionLoader implements IResourceChangeListener {
@PostConstruct @PostConstruct
public void registerListener() { public void registerListener() {
mySearchParameterMap = getSearchParameterMap(); mySearchParameterMap = getSearchParameterMap();
mySystemRequestDetails = SystemRequestDetails.forAllPartition();
IResourceChangeListenerCache subscriptionCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener("Subscription", mySearchParameterMap, this, REFRESH_INTERVAL); IResourceChangeListenerCache subscriptionCache = myResourceChangeListenerRegistry.registerResourceResourceChangeListener("Subscription", mySearchParameterMap, this, REFRESH_INTERVAL);
subscriptionCache.forceRefresh(); subscriptionCache.forceRefresh();
} }
@ -142,7 +146,7 @@ public class SubscriptionLoader implements IResourceChangeListener {
synchronized (mySyncSubscriptionsLock) { synchronized (mySyncSubscriptionsLock) {
ourLog.debug("Starting sync subscriptions"); ourLog.debug("Starting sync subscriptions");
IBundleProvider subscriptionBundleList = getSubscriptionDao().search(mySearchParameterMap); IBundleProvider subscriptionBundleList = getSubscriptionDao().search(mySearchParameterMap, mySystemRequestDetails);
Integer subscriptionCount = subscriptionBundleList.size(); Integer subscriptionCount = subscriptionBundleList.size();
assert subscriptionCount != null; assert subscriptionCount != null;

View File

@ -0,0 +1,11 @@
/**
* Module to support Subscriptions
* <p>
* Subscriptions are partition aware
* <p>
* The functionalities of this module follows the HL7 spec on Subscriptions:
* http://hl7.org/fhir/subscription.html
* <p>
* Activated by {@link ca.uhn.fhir.jpa.model.config.PartitionSettings#setPartitioningEnabled(boolean)}
*/
package ca.uhn.fhir.jpa.subscription;

View File

@ -6,7 +6,9 @@ import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Interceptor; import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel; import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
@ -60,6 +62,8 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
private SubscriptionChannelFactory mySubscriptionChannelFactory; private SubscriptionChannelFactory mySubscriptionChannelFactory;
@Autowired @Autowired
private DaoConfig myDaoConfig; private DaoConfig myDaoConfig;
@Autowired
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
private volatile MessageChannel myMatchingChannel; private volatile MessageChannel myMatchingChannel;
@ -114,7 +118,9 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
*/ */
@Override @Override
public void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) { public void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType, RequestDetails theRequest) {
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType, theRequest); // Even though the resource is being written, the subscription will be interacting with it by effectively "reading" it so we set the RequestPartitionId as a read request
RequestPartitionId requestPartitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead(theRequest, theNewResource.getIdElement().getResourceType(), theNewResource.getIdElement());
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType, theRequest, requestPartitionId);
// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED // Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
HookParams params = new HookParams() HookParams params = new HookParams()

View File

@ -30,6 +30,7 @@ import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService; import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
@ -114,7 +115,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
if (!subscriptionId.hasResourceType()) { if (!subscriptionId.hasResourceType()) {
subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode()); subscriptionId = subscriptionId.withResourceType(ResourceTypeEnum.SUBSCRIPTION.getCode());
} }
subscriptionDao.read(subscriptionId); subscriptionDao.read(subscriptionId, SystemRequestDetails.forAllPartition());
} }
List<IPrimitiveType<String>> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList()); List<IPrimitiveType<String>> resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList());
@ -298,7 +299,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) { private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger); org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType()); IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
IBaseResource resourceToTrigger = dao.read(resourceId); IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartition());
return submitResource(theSubscriptionId, resourceToTrigger); return submitResource(theSubscriptionId, resourceToTrigger);
} }

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.subscription.match.deliver;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber; import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
@ -13,19 +14,28 @@ import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.client.api.IGenericClient; import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory; import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.hl7.fhir.r4.model.IdType; import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Patient; import org.hl7.fhir.r4.model.Patient;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers; import org.mockito.Answers;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.GenericMessage;
import java.time.LocalDate;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.eq; import static org.mockito.Mockito.eq;
@ -35,9 +45,10 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class BaseSubscriptionDeliverySubscriberTest { public class BaseSubscriptionDeliverySubscriberTest {
private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriberTest.class);
private SubscriptionDeliveringRestHookSubscriber mySubscriber; private SubscriptionDeliveringRestHookSubscriber mySubscriber;
private FhirContext myCtx = FhirContext.forR4(); private final FhirContext myCtx = FhirContext.forR4();
@Mock @Mock
private IInterceptorBroadcaster myInterceptorBroadcaster; private IInterceptorBroadcaster myInterceptorBroadcaster;
@ -79,13 +90,9 @@ public class BaseSubscriptionDeliverySubscriberTest {
public void testRestHookDeliverySuccessful() { public void testRestHookDeliverySuccessful() {
when(myInterceptorBroadcaster.callHooks(any(), any())).thenReturn(true); when(myInterceptorBroadcaster.callHooks(any(), any())).thenReturn(true);
Patient patient = new Patient(); Patient patient = generatePatient();
patient.setActive(true);
CanonicalSubscription subscription = new CanonicalSubscription(); CanonicalSubscription subscription = generateSubscription();
subscription.setIdElement(new IdType("Subscription/123"));
subscription.setEndpointUrl("http://example.com/fhir");
subscription.setPayloadString("application/fhir+json");
ResourceDeliveryMessage payload = new ResourceDeliveryMessage(); ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
payload.setSubscription(subscription); payload.setSubscription(subscription);
@ -101,13 +108,9 @@ public class BaseSubscriptionDeliverySubscriberTest {
public void testRestHookDeliveryFails_ShouldRollBack() { public void testRestHookDeliveryFails_ShouldRollBack() {
when(myInterceptorBroadcaster.callHooks(any(), any())).thenReturn(true); when(myInterceptorBroadcaster.callHooks(any(), any())).thenReturn(true);
Patient patient = new Patient(); Patient patient = generatePatient();
patient.setActive(true);
CanonicalSubscription subscription = new CanonicalSubscription(); CanonicalSubscription subscription = generateSubscription();
subscription.setIdElement(new IdType("Subscription/123"));
subscription.setEndpointUrl("http://example.com/fhir");
subscription.setPayloadString("application/fhir+json");
ResourceDeliveryMessage payload = new ResourceDeliveryMessage(); ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
payload.setSubscription(subscription); payload.setSubscription(subscription);
@ -132,13 +135,9 @@ public class BaseSubscriptionDeliverySubscriberTest {
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY), any())).thenReturn(true); when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY), any())).thenReturn(true);
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED), any())).thenReturn(false); when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED), any())).thenReturn(false);
Patient patient = new Patient(); Patient patient = generatePatient();
patient.setActive(true);
CanonicalSubscription subscription = new CanonicalSubscription(); CanonicalSubscription subscription = generateSubscription();
subscription.setIdElement(new IdType("Subscription/123"));
subscription.setEndpointUrl("http://example.com/fhir");
subscription.setPayloadString("application/fhir+json");
ResourceDeliveryMessage payload = new ResourceDeliveryMessage(); ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
payload.setSubscription(subscription); payload.setSubscription(subscription);
@ -158,13 +157,9 @@ public class BaseSubscriptionDeliverySubscriberTest {
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY), any())).thenReturn(true); when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY), any())).thenReturn(true);
when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY), any())).thenReturn(false); when(myInterceptorBroadcaster.callHooks(eq(Pointcut.SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY), any())).thenReturn(false);
Patient patient = new Patient(); Patient patient = generatePatient();
patient.setActive(true);
CanonicalSubscription subscription = new CanonicalSubscription(); CanonicalSubscription subscription = generateSubscription();
subscription.setIdElement(new IdType("Subscription/123"));
subscription.setEndpointUrl("http://example.com/fhir");
subscription.setPayloadString("application/fhir+json");
ResourceDeliveryMessage payload = new ResourceDeliveryMessage(); ResourceDeliveryMessage payload = new ResourceDeliveryMessage();
payload.setSubscription(subscription); payload.setSubscription(subscription);
@ -181,4 +176,77 @@ public class BaseSubscriptionDeliverySubscriberTest {
} }
@Test
public void testSerializeDeliveryMessageWithRequestPartition() throws JsonProcessingException {
CanonicalSubscription subscription = generateSubscription();
Patient patient = generatePatient();
ResourceDeliveryMessage message = new ResourceDeliveryMessage();
message.setPartitionId(RequestPartitionId.fromPartitionId(123, LocalDate.of(2020, 1, 1)));
message.setSubscription(subscription);
message.setPayload(myCtx, patient, EncodingEnum.JSON);
message.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);
ResourceDeliveryJsonMessage jsonMessage = new ResourceDeliveryJsonMessage(message);
String jsonString = jsonMessage.asJson();
ourLog.info(jsonString);
// Assert that the partitionID is being serialized in JSON
assertThat(jsonString, containsString("\"partitionDate\":[2020,1,1]"));
assertThat(jsonString, containsString("\"partitionIds\":[123]"));
}
@Test
public void testSerializeDeliveryMessageWithNoPartition() throws JsonProcessingException {
CanonicalSubscription subscription = generateSubscription();
Patient patient = generatePatient();
ResourceDeliveryMessage message = new ResourceDeliveryMessage();
message.setSubscription(subscription);
message.setPayload(myCtx, patient, EncodingEnum.JSON);
message.setOperationType(ResourceModifiedMessage.OperationTypeEnum.CREATE);
ResourceDeliveryJsonMessage jsonMessage = new ResourceDeliveryJsonMessage(message);
String jsonString = jsonMessage.asJson();
ourLog.info(jsonString);
assertThat(jsonString, containsString("\"operationType\":\"CREATE"));
assertThat(jsonString, containsString("\"canonicalSubscription\":"));
// Assert that the default partitionID is being generated and is being serialized in JSON
assertThat(jsonString, containsString("\"allPartitions\":false"));
assertThat(jsonString, containsString("\"partitionIds\":[null]"));
}
@Test
public void testSerializeLegacyDeliveryMessage() throws JsonProcessingException {
String legacyDeliveryMessageJson = "{\"headers\":{\"retryCount\":0,\"customHeaders\":{}},\"payload\":{\"operationType\":\"CREATE\",\"canonicalSubscription\":{\"id\":\"Subscription/123\",\"endpointUrl\":\"http://example.com/fhir\",\"payload\":\"application/fhir+json\"},\"payload\":\"{\\\"resourceType\\\":\\\"Patient\\\",\\\"active\\\":true}\"}}";
ResourceDeliveryJsonMessage jsonMessage = ResourceDeliveryJsonMessage.fromJson(legacyDeliveryMessageJson);
ourLog.info(jsonMessage.getPayload().getRequestPartitionId().asJson());
assertNotNull(jsonMessage.getPayload().getRequestPartitionId());
assertEquals(jsonMessage.getPayload().getRequestPartitionId().toJson(), RequestPartitionId.defaultPartition().toJson());
}
@NotNull
private Patient generatePatient() {
Patient patient = new Patient();
patient.setActive(true);
return patient;
}
@NotNull
private CanonicalSubscription generateSubscription() {
CanonicalSubscription subscription = new CanonicalSubscription();
subscription.setIdElement(new IdType("Subscription/123"));
subscription.setEndpointUrl("http://example.com/fhir");
subscription.setPayloadString("application/fhir+json");
return subscription;
}
} }

View File

@ -1,10 +1,13 @@
package ca.uhn.fhir.jpa.subscription.module; package ca.uhn.fhir.jpa.subscription.module;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import org.hl7.fhir.r4.model.Organization; import org.hl7.fhir.r4.model.Organization;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.time.LocalDate;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
@ -22,6 +25,7 @@ public class ResourceModifiedTest {
Organization decodedOrg = (Organization) msg.getNewPayload(myFhirContext); Organization decodedOrg = (Organization) msg.getNewPayload(myFhirContext);
assertEquals(org.getId(), decodedOrg.getId()); assertEquals(org.getId(), decodedOrg.getId());
assertEquals(org.getName(), decodedOrg.getName()); assertEquals(org.getName(), decodedOrg.getName());
assertEquals(msg.getPartitionId().toJson(), RequestPartitionId.defaultPartition().toJson());
} }
@Test @Test
@ -35,6 +39,7 @@ public class ResourceModifiedTest {
Organization decodedOrg = (Organization) msg.getNewPayload(myFhirContext); Organization decodedOrg = (Organization) msg.getNewPayload(myFhirContext);
assertEquals(org.getId(), decodedOrg.getId()); assertEquals(org.getId(), decodedOrg.getId());
assertEquals(org.getName(), decodedOrg.getName()); assertEquals(org.getName(), decodedOrg.getName());
assertEquals(msg.getPartitionId().toJson(), RequestPartitionId.defaultPartition().toJson());
} }
@Test @Test
@ -46,6 +51,19 @@ public class ResourceModifiedTest {
assertEquals("Organization/testOrgId", msg.getPayloadId(myFhirContext).getValue()); assertEquals("Organization/testOrgId", msg.getPayloadId(myFhirContext).getValue());
assertEquals(ResourceModifiedMessage.OperationTypeEnum.DELETE, msg.getOperationType()); assertEquals(ResourceModifiedMessage.OperationTypeEnum.DELETE, msg.getOperationType());
assertNull(msg.getNewPayload(myFhirContext)); assertNull(msg.getNewPayload(myFhirContext));
assertEquals(msg.getPartitionId().toJson(), RequestPartitionId.defaultPartition().toJson());
}
@Test
public void testCreateWithPartition() {
Organization org = new Organization();
org.setName("testOrgName");
org.setId("Organization/testOrgId");
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, org, ResourceModifiedMessage.OperationTypeEnum.CREATE);
msg.setPartitionId(RequestPartitionId.fromPartitionId(123, LocalDate.of(2020, 1, 1)));
assertEquals(msg.getPartitionId().getPartitionIds().size(), 1);
assertEquals(msg.getPartitionId().getPartitionIds().get(0), 123);
} }
} }

View File

@ -4,6 +4,11 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams; import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorService; import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer; import ca.uhn.fhir.jpa.subscription.channel.subscription.ISubscriptionDeliveryChannelNamer;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
@ -40,6 +45,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -59,6 +65,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
@Autowired @Autowired
FhirContext myFhirContext; FhirContext myFhirContext;
@Autowired
protected DaoRegistry myDaoRegistry;
// Caused by: java.lang.IllegalStateException: Unable to register mock bean org.springframework.messaging.MessageHandler expected a single matching bean to replace but found [subscriptionActivatingSubscriber, subscriptionDeliveringEmailSubscriber, subscriptionDeliveringRestHookSubscriber, subscriptionMatchingSubscriber, subscriptionRegisteringSubscriber] // Caused by: java.lang.IllegalStateException: Unable to register mock bean org.springframework.messaging.MessageHandler expected a single matching bean to replace but found [subscriptionActivatingSubscriber, subscriptionDeliveringEmailSubscriber, subscriptionDeliveringRestHookSubscriber, subscriptionMatchingSubscriber, subscriptionRegisteringSubscriber]
@ -82,6 +90,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
private SubscriptionLoader mySubscriptionLoader; private SubscriptionLoader mySubscriptionLoader;
@Autowired @Autowired
private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
@Autowired
protected PartitionSettings myPartitionSettings;
protected String myCode = "1000000050"; protected String myCode = "1000000050";
@ -96,6 +106,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED); protected final PointcutLatch mySubscriptionMatchingPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED);
protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED); protected final PointcutLatch mySubscriptionActivatedPost = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED);
protected final PointcutLatch mySubscriptionAfterDelivery = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_DELIVERY); protected final PointcutLatch mySubscriptionAfterDelivery = new PointcutLatch(Pointcut.SUBSCRIPTION_AFTER_DELIVERY);
protected final PointcutLatch mySubscriptionResourceMatched = new PointcutLatch(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED);
protected final PointcutLatch mySubscriptionResourceNotMatched = new PointcutLatch(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS);
@BeforeEach @BeforeEach
public void beforeReset() { public void beforeReset() {
@ -113,10 +125,13 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, mySubscriptionMatchingPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, mySubscriptionActivatedPost);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, mySubscriptionAfterDelivery); myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, mySubscriptionAfterDelivery);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, mySubscriptionResourceMatched);
myInterceptorRegistry.registerAnonymousInterceptor(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, mySubscriptionResourceNotMatched);
} }
@AfterEach @AfterEach
public void cleanup() { public void cleanup() {
myPartitionSettings.setPartitioningEnabled(false);
myInterceptorRegistry.unregisterAllInterceptors(); myInterceptorRegistry.unregisterAllInterceptors();
mySubscriptionMatchingPost.clear(); mySubscriptionMatchingPost.clear();
mySubscriptionActivatedPost.clear(); mySubscriptionActivatedPost.clear();
@ -125,7 +140,11 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
} }
public <T extends IBaseResource> T sendResource(T theResource) throws InterruptedException { public <T extends IBaseResource> T sendResource(T theResource) throws InterruptedException {
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE); return sendResource(theResource, null);
}
public <T extends IBaseResource> T sendResource(T theResource, RequestPartitionId theRequestPartitionId) throws InterruptedException {
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResource, ResourceModifiedMessage.OperationTypeEnum.CREATE, null, theRequestPartitionId);
ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(msg); ResourceModifiedJsonMessage message = new ResourceModifiedJsonMessage(msg);
mySubscriptionMatchingPost.setExpectedCount(1); mySubscriptionMatchingPost.setExpectedCount(1);
ourSubscribableChannel.send(message); ourSubscribableChannel.send(message);
@ -133,15 +152,18 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
return theResource; return theResource;
} }
protected Subscription sendSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { protected Subscription sendSubscription(Subscription theSubscription, RequestPartitionId theRequestPartitionId, Boolean mockDao) throws InterruptedException {
Subscription subscription = makeActiveSubscription(theCriteria, thePayload, theEndpoint);
mySubscriptionActivatedPost.setExpectedCount(1); mySubscriptionActivatedPost.setExpectedCount(1);
Subscription retval = sendResource(subscription); Subscription retVal = sendResource(theSubscription, theRequestPartitionId);
mySubscriptionActivatedPost.awaitExpected(); mySubscriptionActivatedPost.awaitExpected();
return retval; return retVal;
} }
protected Observation sendObservation(String code, String system) throws InterruptedException { protected Observation sendObservation(String code, String system) throws InterruptedException {
return sendObservation(code, system, null);
}
protected Observation sendObservation(String code, String system, RequestPartitionId theRequestPartitionId) throws InterruptedException {
Observation observation = new Observation(); Observation observation = new Observation();
IdType id = new IdType("Observation", nextId()); IdType id = new IdType("Observation", nextId());
observation.setId(id); observation.setId(id);
@ -154,7 +176,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
observation.setStatus(Observation.ObservationStatus.FINAL); observation.setStatus(Observation.ObservationStatus.FINAL);
return sendResource(observation); return sendResource(observation, theRequestPartitionId);
} }
@BeforeAll @BeforeAll
@ -224,6 +246,8 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
} }
@Override @Override
public void clear() { updateLatch.clear();} public void clear() {
updateLatch.clear();
}
} }
} }

View File

@ -6,6 +6,7 @@ import org.hl7.fhir.dstu3.model.CodeableConcept;
import org.hl7.fhir.dstu3.model.Coding; import org.hl7.fhir.dstu3.model.Coding;
import org.hl7.fhir.dstu3.model.IdType; import org.hl7.fhir.dstu3.model.IdType;
import org.hl7.fhir.dstu3.model.Observation; import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -26,8 +27,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
sendSubscription(criteria1, payload, ourListenerServerBase); Subscription subscription1 = makeActiveSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase); sendSubscription(subscription1, null, false);
Subscription subscription2 = makeActiveSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(subscription2, null, false);
assertEquals(2, mySubscriptionRegistry.size()); assertEquals(2, mySubscriptionRegistry.size());
@ -47,8 +50,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
sendSubscription(criteria1, payload, ourListenerServerBase); Subscription subscription1 = makeActiveSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase); sendSubscription(subscription1, null, false);
Subscription subscription2 = makeActiveSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(subscription2, null, false);
assertEquals(2, mySubscriptionRegistry.size()); assertEquals(2, mySubscriptionRegistry.size());
@ -68,8 +73,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code; String criteria1 = "Observation?code=SNOMED-CT|" + code;
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111"; String criteria2 = "Observation?code=SNOMED-CT|" + code + "111";
sendSubscription(criteria1, payload, ourListenerServerBase); Subscription subscription1 = makeActiveSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase); sendSubscription(subscription1, null, false);
Subscription subscription2 = makeActiveSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(subscription2, null, false);
assertEquals(2, mySubscriptionRegistry.size()); assertEquals(2, mySubscriptionRegistry.size());
@ -90,8 +97,10 @@ public class SubscriptionCheckingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
sendSubscription(criteria1, payload, ourListenerServerBase); Subscription subscription1 = makeActiveSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase); sendSubscription(subscription1, null, false);
Subscription subscription2 = makeActiveSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(subscription2, null, false);
assertEquals(2, mySubscriptionRegistry.size()); assertEquals(2, mySubscriptionRegistry.size());

View File

@ -1,19 +1,37 @@
package ca.uhn.fhir.jpa.subscription.module.subscriber; package ca.uhn.fhir.jpa.subscription.module.subscriber;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test; import ca.uhn.fhir.jpa.subscription.module.standalone.BaseBlockingQueueSubscribableChannelDstu3Test;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import com.google.common.collect.Lists;
import org.hl7.fhir.dstu3.model.Observation; import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.dstu3.model.Subscription;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
/** /**
* Tests copied from jpa.subscription.resthook.RestHookTestDstu3Test * Tests copied from jpa.subscription.resthook.RestHookTestDstu3Test
*/ */
public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscribableChannelDstu3Test { public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class); private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriberTest.class);
private final IFhirResourceDao<Subscription> myMockSubscriptionDao = Mockito.mock(IFhirResourceDao.class);
@BeforeEach
public void beforeEach() {
Mockito.when(myMockSubscriptionDao.getResourceType()).thenReturn(Subscription.class);
myDaoRegistry.register(myMockSubscriptionDao);
}
@Test @Test
public void testRestHookSubscriptionApplicationFhirJson() throws Exception { public void testRestHookSubscriptionApplicationFhirJson() throws Exception {
@ -23,8 +41,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
sendSubscription(criteria1, payload, ourListenerServerBase); Subscription subscription1 = makeActiveSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase); sendSubscription(subscription1, null, false);
Subscription subscription2 = makeActiveSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(subscription2, null, false);
assertEquals(2, mySubscriptionRegistry.size()); assertEquals(2, mySubscriptionRegistry.size());
@ -44,8 +64,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
sendSubscription(criteria1, payload, ourListenerServerBase); Subscription subscription1 = makeActiveSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase); sendSubscription(subscription1, null, false);
Subscription subscription2 = makeActiveSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(subscription2, null, false);
assertEquals(2, mySubscriptionRegistry.size()); assertEquals(2, mySubscriptionRegistry.size());
@ -59,16 +81,16 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
@Test @Test
public void testRestHookSubscription_NoResourceTypeInPayloadId() throws Exception { public void testRestHookSubscription_NoResourceTypeInPayloadId() throws Exception {
sendSubscription("Observation?", "application/fhir+xml", ourListenerServerBase);
assertEquals(1, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
Observation observation = new Observation(); Observation observation = new Observation();
observation.setId("OBS"); observation.setId("OBS");
observation.setStatus(Observation.ObservationStatus.CORRECTED); observation.setStatus(Observation.ObservationStatus.CORRECTED);
sendResource(observation);
Subscription subscription = makeActiveSubscription("Observation?", "application/fhir+xml", ourListenerServerBase);
sendSubscription(subscription, null, false);
assertEquals(1, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(1);
sendResource(observation);
ourObservationListener.awaitExpected(); ourObservationListener.awaitExpected();
assertEquals(1, ourContentTypes.size()); assertEquals(1, ourContentTypes.size());
@ -83,8 +105,10 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
String criteria1 = "Observation?code=SNOMED-CT|" + code; String criteria1 = "Observation?code=SNOMED-CT|" + code;
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111"; String criteria2 = "Observation?code=SNOMED-CT|" + code + "111";
sendSubscription(criteria1, payload, ourListenerServerBase); Subscription subscription1 = makeActiveSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase); sendSubscription(subscription1, null, false);
Subscription subscription2 = makeActiveSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(subscription2, null, false);
assertEquals(2, mySubscriptionRegistry.size()); assertEquals(2, mySubscriptionRegistry.size());
@ -107,9 +131,12 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
String criteria2 = "[*]"; String criteria2 = "[*]";
String criteria3 = "Observation?code=FOO"; // won't match String criteria3 = "Observation?code=FOO"; // won't match
sendSubscription(criteria1, payload, ourListenerServerBase); Subscription subscription1 = makeActiveSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase); sendSubscription(subscription1, null, false);
sendSubscription(criteria3, payload, ourListenerServerBase); Subscription subscription2 = makeActiveSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(subscription2, null, false);
Subscription subscription3 = makeActiveSubscription(criteria3, payload, ourListenerServerBase);
sendSubscription(subscription3, null, false);
assertEquals(3, mySubscriptionRegistry.size()); assertEquals(3, mySubscriptionRegistry.size());
@ -121,5 +148,127 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
} }
@Test
public void testSubscriptionAndResourceOnTheSamePartitionMatch() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(0);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", requestPartitionId);
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionAndResourceOnTheSamePartitionMatchPart2() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", requestPartitionId);
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionAndResourceOnDiffPartitionNotMatch() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(0));
mySubscriptionResourceNotMatched.awaitExpected();
}
@Test
public void testSubscriptionAndResourceOnDiffPartitionNotMatchPart2() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(0);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionId(1));
mySubscriptionResourceNotMatched.awaitExpected();
}
@Test
public void testSubscriptionOnOnePartitionMatchResourceOnMultiplePartitions() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
ourObservationListener.setExpectedCount(1);
mySubscriptionResourceMatched.setExpectedCount(1);
List<Integer> partitionId = Collections.synchronizedList(Lists.newArrayList(0, 1, 2));
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionIds(partitionId));
mySubscriptionResourceMatched.awaitExpected();
ourObservationListener.awaitExpected();
}
@Test
public void testSubscriptionOnOnePartitionDoNotMatchResourceOnMultiplePartitions() throws InterruptedException {
myPartitionSettings.setPartitioningEnabled(true);
String payload = "application/fhir+json";
String code = "1000000050";
String criteria = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
RequestPartitionId requestPartitionId = RequestPartitionId.fromPartitionId(1);
Subscription subscription = makeActiveSubscription(criteria, payload, ourListenerServerBase);
mockSubscriptionRead(requestPartitionId, subscription);
sendSubscription(subscription, requestPartitionId, true);
mySubscriptionResourceNotMatched.setExpectedCount(1);
List<Integer> partitionId = Collections.synchronizedList(Lists.newArrayList(0, 2));
sendObservation(code, "SNOMED-CT", RequestPartitionId.fromPartitionIds(partitionId));
mySubscriptionResourceNotMatched.awaitExpected();
}
private void mockSubscriptionRead(RequestPartitionId theRequestPartitionId, Subscription subscription) {
Subscription modifiedSubscription = subscription.copy();
// the original partition info was the request info, but we need the actual storage partition.
modifiedSubscription.setUserData(Constants.RESOURCE_PARTITION_ID, theRequestPartitionId);
Mockito.when(myMockSubscriptionDao.read(eq(subscription.getIdElement()), any())).thenReturn(modifiedSubscription);
}
} }

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
import ca.uhn.fhir.jpa.model.config.PartitionSettings; import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.ModelConfig; import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig; import ca.uhn.fhir.jpa.searchparam.config.SearchParamConfig;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider; import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamProvider;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory; import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
@ -53,6 +54,8 @@ public class SubscriptionSubmitInterceptorLoaderTest {
private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; private SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor;
@MockBean @MockBean
private IResourceVersionSvc myResourceVersionSvc; private IResourceVersionSvc myResourceVersionSvc;
@MockBean
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
/** /**
* It should be possible to run only the {@link SubscriptionSubmitterConfig} without the * It should be possible to run only the {@link SubscriptionSubmitterConfig} without the

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -7,7 +7,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId> <artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</parent> </parent>
<artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId> <artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId> <artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</parent> </parent>
<artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId> <artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId> <artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</parent> </parent>
<artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId> <artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot</artifactId> <artifactId>hapi-fhir-spring-boot</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</parent> </parent>
<artifactId>hapi-fhir-spring-boot-samples</artifactId> <artifactId>hapi-fhir-spring-boot-samples</artifactId>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -57,6 +57,10 @@ public class SystemRequestDetails extends RequestDetails {
super(new MyInterceptorBroadcaster()); super(new MyInterceptorBroadcaster());
} }
public static SystemRequestDetails forAllPartition(){
return new SystemRequestDetails().setRequestPartitionId(RequestPartitionId.allPartitions());
}
private ListMultimap<String, String> myHeaders; private ListMultimap<String, String> myHeaders;
/** /**

View File

@ -22,15 +22,15 @@ package ca.uhn.fhir.jpa.subscription.match.registry;
import ca.uhn.fhir.context.ConfigurationException; import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy; import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.model.dstu2.composite.CodingDt;
import ca.uhn.fhir.model.dstu2.resource.Subscription; import ca.uhn.fhir.model.dstu2.resource.Subscription;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.util.HapiExtensions; import ca.uhn.fhir.util.HapiExtensions;
import org.hl7.fhir.dstu3.model.Coding;
import org.hl7.fhir.exceptions.FHIRException; import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IBaseCoding; import org.hl7.fhir.instance.model.api.IBaseCoding;
import org.hl7.fhir.instance.model.api.IBaseHasExtensions; import org.hl7.fhir.instance.model.api.IBaseHasExtensions;
@ -46,7 +46,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -124,6 +123,7 @@ public class SubscriptionCanonicalizer {
if (status != null) { if (status != null) {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(status.toCode())); retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(status.toCode()));
} }
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setChannelType(getChannelType(theSubscription)); retVal.setChannelType(getChannelType(theSubscription));
retVal.setCriteriaString(subscription.getCriteria()); retVal.setCriteriaString(subscription.getCriteria());
retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); retVal.setEndpointUrl(subscription.getChannel().getEndpoint());
@ -232,6 +232,7 @@ public class SubscriptionCanonicalizer {
retVal.setPayloadString(subscription.getChannel().getPayload()); retVal.setPayloadString(subscription.getChannel().getPayload());
retVal.setPayloadSearchCriteria(getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA)); retVal.setPayloadSearchCriteria(getExtensionString(subscription, HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA));
retVal.setTags(extractTags(subscription)); retVal.setTags(extractTags(subscription));
setPartitionIdOnReturnValue(theSubscription, retVal);
if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) { if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) {
String from; String from;
@ -278,6 +279,7 @@ public class SubscriptionCanonicalizer {
if (status != null) { if (status != null) {
retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(status.toCode())); retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(status.toCode()));
} }
setPartitionIdOnReturnValue(theSubscription, retVal);
retVal.setChannelType(getChannelType(subscription)); retVal.setChannelType(getChannelType(subscription));
retVal.setCriteriaString(getCriteria(theSubscription)); retVal.setCriteriaString(getCriteria(theSubscription));
retVal.setEndpointUrl(subscription.getEndpoint()); retVal.setEndpointUrl(subscription.getEndpoint());
@ -325,6 +327,13 @@ public class SubscriptionCanonicalizer {
return retVal; return retVal;
} }
private void setPartitionIdOnReturnValue(IBaseResource theSubscription, CanonicalSubscription retVal) {
RequestPartitionId requestPartitionId = (RequestPartitionId) theSubscription.getUserData(Constants.RESOURCE_PARTITION_ID);
if (requestPartitionId != null) {
retVal.setPartitionId(requestPartitionId.getFirstPartitionIdOrNull());
}
}
private String getExtensionString(IBaseHasExtensions theBase, String theUrl) { private String getExtensionString(IBaseHasExtensions theBase, String theUrl) {
return theBase return theBase
.getExtension() .getExtension()

View File

@ -43,7 +43,6 @@ import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class CanonicalSubscription implements Serializable, Cloneable, IModelJson { public class CanonicalSubscription implements Serializable, Cloneable, IModelJson {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@JsonProperty("id") @JsonProperty("id")
@ -72,6 +71,8 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
private Map<String, String> myTags; private Map<String, String> myTags;
@JsonProperty("payloadSearchCriteria") @JsonProperty("payloadSearchCriteria")
private String myPayloadSearchCriteria; private String myPayloadSearchCriteria;
@JsonProperty("partitionId")
private Integer myPartitionId;
/** /**
* Constructor * Constructor
@ -89,7 +90,7 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
} }
/** /**
* For now we're using the R4 TriggerDefinition, but this * For now, we're using the R4 TriggerDefinition, but this
* may change in the future when things stabilize * may change in the future when things stabilize
*/ */
public void addTrigger(CanonicalEventDefinition theTrigger) { public void addTrigger(CanonicalEventDefinition theTrigger) {
@ -159,9 +160,9 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
public String getChannelExtension(String theUrl) { public String getChannelExtension(String theUrl) {
String retVal = null; String retVal = null;
List<String> strings = myChannelExtensions.get(theUrl); List<String> channelExtensions = myChannelExtensions.get(theUrl);
if (strings != null && strings.isEmpty() == false) { if (channelExtensions != null && !channelExtensions.isEmpty()) {
retVal = strings.get(0); retVal = channelExtensions.get(0);
} }
return retVal; return retVal;
} }
@ -227,8 +228,16 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
myStatus = theStatus; myStatus = theStatus;
} }
public Integer getRequestPartitionId() {
return myPartitionId;
}
public void setPartitionId(Integer thePartitionId) {
myPartitionId = thePartitionId;
}
/** /**
* For now we're using the R4 triggerdefinition, but this * For now, we're using the R4 triggerdefinition, but this
* may change in the future when things stabilize * may change in the future when things stabilize
*/ */
public CanonicalEventDefinition getTrigger() { public CanonicalEventDefinition getTrigger() {
@ -325,7 +334,7 @@ public class CanonicalSubscription implements Serializable, Cloneable, IModelJso
private String mySubjectTemplate; private String mySubjectTemplate;
/** /**
* Construcor * Constructor
*/ */
public EmailDetails() { public EmailDetails() {
super(); super();

View File

@ -22,9 +22,12 @@ package ca.uhn.fhir.jpa.subscription.model;
import ca.uhn.fhir.rest.server.messaging.json.BaseJsonMessage; import ca.uhn.fhir.rest.server.messaging.json.BaseJsonMessage;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;
public class ResourceDeliveryJsonMessage extends BaseJsonMessage<ResourceDeliveryMessage> { public class ResourceDeliveryJsonMessage extends BaseJsonMessage<ResourceDeliveryMessage> {
private static final ObjectMapper ourObjectMapper = new ObjectMapper().registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());
@JsonProperty("payload") @JsonProperty("payload")
private ResourceDeliveryMessage myPayload; private ResourceDeliveryMessage myPayload;
@ -58,4 +61,12 @@ public class ResourceDeliveryJsonMessage extends BaseJsonMessage<ResourceDeliver
.append("myPayload", myPayload) .append("myPayload", myPayload)
.toString(); .toString();
} }
public static ResourceDeliveryJsonMessage fromJson(String theJson) throws JsonProcessingException {
return ourObjectMapper.readValue(theJson, ResourceDeliveryJsonMessage.class);
}
public String asJson() throws JsonProcessingException {
return ourObjectMapper.writeValueAsString(this);
}
} }

View File

@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.subscription.model;
*/ */
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage; import ca.uhn.fhir.rest.server.messaging.BaseResourceMessage;
@ -38,6 +39,8 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
@JsonProperty("canonicalSubscription") @JsonProperty("canonicalSubscription")
private CanonicalSubscription mySubscription; private CanonicalSubscription mySubscription;
@JsonProperty("partitionId")
private RequestPartitionId myPartitionId;
@JsonProperty("payload") @JsonProperty("payload")
private String myPayloadString; private String myPayloadString;
@JsonProperty("payloadId") @JsonProperty("payloadId")
@ -50,6 +53,7 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
*/ */
public ResourceDeliveryMessage() { public ResourceDeliveryMessage() {
super(); super();
myPartitionId = RequestPartitionId.defaultPartition();
} }
public IBaseResource getPayload(FhirContext theCtx) { public IBaseResource getPayload(FhirContext theCtx) {
@ -110,6 +114,14 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
} }
} }
public RequestPartitionId getRequestPartitionId() {
return myPartitionId;
}
public void setPartitionId(RequestPartitionId thePartitionId) {
myPartitionId = thePartitionId;
}
@Override @Override
public String toString() { public String toString() {
return new ToStringBuilder(this) return new ToStringBuilder(this)
@ -117,6 +129,7 @@ public class ResourceDeliveryMessage extends BaseResourceMessage implements IRes
.append("myPayloadString", myPayloadString) .append("myPayloadString", myPayloadString)
.append("myPayload", myPayloadDecoded) .append("myPayload", myPayloadDecoded)
.append("myPayloadId", myPayloadId) .append("myPayloadId", myPayloadId)
.append("myPartitionId", myPartitionId)
.append("myOperationType", getOperationType()) .append("myOperationType", getOperationType())
.toString(); .toString();
} }

View File

@ -21,14 +21,13 @@ package ca.uhn.fhir.jpa.subscription.model;
*/ */
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage; import ca.uhn.fhir.rest.server.messaging.BaseResourceModifiedMessage;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import javax.annotation.Nullable;
/** /**
* Most of this class has been moved to ResourceModifiedMessage in the hapi-fhir-server project, for a reusable channel ResourceModifiedMessage * Most of this class has been moved to ResourceModifiedMessage in the hapi-fhir-server project, for a reusable channel ResourceModifiedMessage
* that doesn't require knowledge of subscriptions. * that doesn't require knowledge of subscriptions.
@ -42,6 +41,8 @@ public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
@JsonProperty(value = "subscriptionId", required = false) @JsonProperty(value = "subscriptionId", required = false)
private String mySubscriptionId; private String mySubscriptionId;
@JsonProperty(value = "partitionId", required = false)
private RequestPartitionId myPartitionId;
/** /**
@ -53,10 +54,17 @@ public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) { public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theResource, OperationTypeEnum theOperationType) {
super(theFhirContext, theResource, theOperationType); super(theFhirContext, theResource, theOperationType);
myPartitionId = RequestPartitionId.defaultPartition();
} }
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) { public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest) {
super(theFhirContext, theNewResource, theOperationType, theRequest); super(theFhirContext, theNewResource, theOperationType, theRequest);
myPartitionId = RequestPartitionId.defaultPartition();
}
public ResourceModifiedMessage(FhirContext theFhirContext, IBaseResource theNewResource, OperationTypeEnum theOperationType, RequestDetails theRequest, RequestPartitionId theRequestPartitionId) {
super(theFhirContext, theNewResource, theOperationType, theRequest);
myPartitionId = theRequestPartitionId;
} }
@ -68,6 +76,14 @@ public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
mySubscriptionId = theSubscriptionId; mySubscriptionId = theSubscriptionId;
} }
public RequestPartitionId getPartitionId() {
return myPartitionId;
}
public void setPartitionId(RequestPartitionId thePartitionId) {
myPartitionId = thePartitionId;
}
@Override @Override
public String toString() { public String toString() {
@ -75,6 +91,7 @@ public class ResourceModifiedMessage extends BaseResourceModifiedMessage {
.append("operationType", myOperationType) .append("operationType", myOperationType)
.append("subscriptionId", mySubscriptionId) .append("subscriptionId", mySubscriptionId)
.append("payloadId", myPayloadId) .append("payloadId", myPayloadId)
.append("partitionId", myPartitionId)
.toString(); .toString();
} }
} }

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId> <artifactId>hapi-deployable-pom</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath> <relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
@ -58,37 +58,37 @@
<dependency> <dependency>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-structures-dstu3</artifactId> <artifactId>hapi-fhir-structures-dstu3</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-structures-hl7org-dstu2</artifactId> <artifactId>hapi-fhir-structures-hl7org-dstu2</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-structures-r4</artifactId> <artifactId>hapi-fhir-structures-r4</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-structures-r5</artifactId> <artifactId>hapi-fhir-structures-r5</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-validation-resources-dstu2</artifactId> <artifactId>hapi-fhir-validation-resources-dstu2</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-validation-resources-dstu3</artifactId> <artifactId>hapi-fhir-validation-resources-dstu3</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-validation-resources-r4</artifactId> <artifactId>hapi-fhir-validation-resources-r4</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.velocity</groupId> <groupId>org.apache.velocity</groupId>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@ -6,7 +6,7 @@
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<name>HAPI-FHIR</name> <name>HAPI-FHIR</name>
<description>An open-source implementation of the FHIR specification in Java.</description> <description>An open-source implementation of the FHIR specification in Java.</description>
<url>https://hapifhir.io</url> <url>https://hapifhir.io</url>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath> <relativePath>../../pom.xml</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath> <relativePath>../../pom.xml</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>ca.uhn.hapi.fhir</groupId> <groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId> <artifactId>hapi-fhir</artifactId>
<version>5.7.0-PRE6-SNAPSHOT</version> <version>5.7.0-PRE7-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath> <relativePath>../../pom.xml</relativePath>
</parent> </parent>