4523 subscription sources (#4543)
* fix + test * minor fix * Addressing suggestion * Minor changes * Revert changes, add new test clas, new test consumer for queus * Finish implementation * fix compilation * Remove unused method and test from the previous solution * fix + test * minor fix * Addressing suggestion * Minor changes * Revert changes, add new test clas, new test consumer for queus * Finish implementation * fix compilation * Remove unused method and test from the previous solution * More Renaming * fix compilation failure * Revert compilation fix and merge master * Fix to pipeline test failures * Addressing suggestions --------- Co-authored-by: qingyixia <cherry.xia@smilecdr.com>
This commit is contained in:
parent
73235f1e09
commit
607f27b686
|
@ -25,6 +25,7 @@ import ca.uhn.fhir.context.BaseRuntimeElementCompositeDefinition;
|
|||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.context.FhirVersionEnum;
|
||||
import ca.uhn.fhir.i18n.Msg;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.hl7.fhir.instance.model.api.IBase;
|
||||
import org.hl7.fhir.instance.model.api.IBaseExtension;
|
||||
import org.hl7.fhir.instance.model.api.IBaseHasExtensions;
|
||||
|
@ -36,6 +37,9 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.defaultString;
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
||||
|
||||
public class MetaUtil {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(MetaUtil.class);
|
||||
|
@ -44,6 +48,11 @@ public class MetaUtil {
|
|||
// non-instantiable
|
||||
}
|
||||
|
||||
public static String cleanProvenanceSourceUriOrEmpty(String theProvenanceSourceUri) {
|
||||
String sanitizedProvenance = defaultString(theProvenanceSourceUri);
|
||||
return StringUtils.substringBefore(sanitizedProvenance, "#");
|
||||
}
|
||||
|
||||
public static String getSource(FhirContext theContext, IBaseMetaType theMeta) {
|
||||
if (theContext.getVersion().getVersion().isEqualOrNewerThan(FhirVersionEnum.R4)) {
|
||||
return getSourceR4Plus(theContext, theMeta);
|
||||
|
@ -80,6 +89,17 @@ public class MetaUtil {
|
|||
return retVal;
|
||||
}
|
||||
|
||||
public static <R extends IBaseResource> void populateResourceSource(FhirContext theFhirContext, String theProvenanceSourceUri, String theProvenanceRequestId, R theRetVal) {
|
||||
String sourceString = cleanProvenanceSourceUriOrEmpty(theProvenanceSourceUri);
|
||||
if (isNotBlank(theProvenanceRequestId)){
|
||||
sourceString = sourceString + "#" + theProvenanceRequestId;
|
||||
}
|
||||
|
||||
if (isNotBlank(sourceString)){
|
||||
setSource(theFhirContext, theRetVal, sourceString);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value for <code>Resource.meta.source</code> for R4+ resources, and places the value in
|
||||
* an extension on <code>Resource.meta</code>
|
||||
|
@ -119,4 +139,5 @@ public class MetaUtil {
|
|||
sourceElement.setValueAsString(theValue);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
type: fix
|
||||
issue: 4523
|
||||
title: "Previously, meta source field in subscription messages was inconsistently populated regarding different requests.
|
||||
Now, this has been fixed and meta source will be included in all subscription messages."
|
|
@ -1289,10 +1289,12 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
}
|
||||
|
||||
String requestId = getRequestId(theRequest, source);
|
||||
source = cleanProvenanceSourceUri(source);
|
||||
source = MetaUtil.cleanProvenanceSourceUriOrEmpty(source);
|
||||
|
||||
boolean haveSource = isNotBlank(source) && myStorageSettings.getStoreMetaSourceInformation().isStoreSourceUri();
|
||||
boolean haveRequestId = isNotBlank(requestId) && myStorageSettings.getStoreMetaSourceInformation().isStoreRequestId();
|
||||
boolean shouldStoreSource = myStorageSettings.getStoreMetaSourceInformation().isStoreSourceUri();
|
||||
boolean shouldStoreRequestId = myStorageSettings.getStoreMetaSourceInformation().isStoreRequestId();
|
||||
boolean haveSource = isNotBlank(source) && shouldStoreSource;
|
||||
boolean haveRequestId = isNotBlank(requestId) && shouldStoreRequestId;
|
||||
if (haveSource || haveRequestId) {
|
||||
ResourceHistoryProvenanceEntity provenance = new ResourceHistoryProvenanceEntity();
|
||||
provenance.setResourceHistoryTable(historyEntry);
|
||||
|
@ -1304,6 +1306,10 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
if (haveSource) {
|
||||
provenance.setSourceUri(source);
|
||||
}
|
||||
if (theResource != null) {
|
||||
MetaUtil.populateResourceSource(myFhirContext, shouldStoreSource ? source : null, shouldStoreRequestId ? requestId : null , theResource);
|
||||
}
|
||||
|
||||
myEntityManager.persist(provenance);
|
||||
}
|
||||
}
|
||||
|
@ -1586,16 +1592,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
|
|||
return new MemoryCacheService.TagDefinitionCacheKey(theTagType, theScheme, theTerm);
|
||||
}
|
||||
|
||||
static String cleanProvenanceSourceUri(String theProvenanceSourceUri) {
|
||||
if (isNotBlank(theProvenanceSourceUri)) {
|
||||
int hashIndex = theProvenanceSourceUri.indexOf('#');
|
||||
if (hashIndex != -1) {
|
||||
theProvenanceSourceUri = theProvenanceSourceUri.substring(0, hashIndex);
|
||||
}
|
||||
}
|
||||
return defaultString(theProvenanceSourceUri);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static String parseContentTextIntoWords(FhirContext theContext, IBaseResource theResource) {
|
||||
|
||||
|
|
|
@ -70,7 +70,6 @@ import java.util.Collections;
|
|||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.cleanProvenanceSourceUri;
|
||||
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.decodeResource;
|
||||
import static org.apache.commons.lang3.StringUtils.defaultString;
|
||||
import static org.apache.commons.lang3.StringUtils.isNotBlank;
|
||||
|
@ -79,7 +78,7 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
public static final LenientErrorHandler LENIENT_ERROR_HANDLER = new LenientErrorHandler(false).setErrorOnInvalidValue(false);
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(JpaStorageResourceParser.class);
|
||||
@Autowired
|
||||
private FhirContext myContext;
|
||||
private FhirContext myFhirContext;
|
||||
@Autowired
|
||||
private JpaStorageSettings myStorageSettings;
|
||||
@Autowired
|
||||
|
@ -91,7 +90,7 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
|
||||
@Override
|
||||
public IBaseResource toResource(IBasePersistedResource theEntity, boolean theForHistoryOperation) {
|
||||
RuntimeResourceDefinition type = myContext.getResourceDefinition(theEntity.getResourceType());
|
||||
RuntimeResourceDefinition type = myFhirContext.getResourceDefinition(theEntity.getResourceType());
|
||||
Class<? extends IBaseResource> resourceType = type.getImplementingClass();
|
||||
return toResource(resourceType, (IBaseResourceEntity) theEntity, null, theForHistoryOperation);
|
||||
}
|
||||
|
@ -211,7 +210,7 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
retVal = populateResourceMetadata(theEntity, theForHistoryOperation, tagList, version, retVal);
|
||||
|
||||
// 6. Handle source (provenance)
|
||||
populateResourceSource(provenanceSourceUri, provenanceRequestId, retVal);
|
||||
MetaUtil.populateResourceSource(myFhirContext, provenanceSourceUri, provenanceRequestId, retVal);
|
||||
|
||||
// 7. Add partition information
|
||||
populateResourcePartitionInformation(theEntity, retVal);
|
||||
|
@ -231,16 +230,6 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
}
|
||||
}
|
||||
|
||||
private <R extends IBaseResource> void populateResourceSource(String provenanceSourceUri, String provenanceRequestId, R retVal) {
|
||||
if (isNotBlank(provenanceRequestId) || isNotBlank(provenanceSourceUri)) {
|
||||
String sourceString = cleanProvenanceSourceUri(provenanceSourceUri)
|
||||
+ (isNotBlank(provenanceRequestId) ? "#" : "")
|
||||
+ defaultString(provenanceRequestId);
|
||||
|
||||
MetaUtil.setSource(myContext, retVal, sourceString);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <R extends IBaseResource> R parseResource(IBaseResourceEntity theEntity, ResourceEncodingEnum resourceEncoding, String decodedResourceText, Class<R> resourceType) {
|
||||
R retVal;
|
||||
|
@ -253,7 +242,7 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
} catch (Exception e) {
|
||||
StringBuilder b = new StringBuilder();
|
||||
b.append("Failed to parse database resource[");
|
||||
b.append(myContext.getResourceType(resourceType));
|
||||
b.append(myFhirContext.getResourceType(resourceType));
|
||||
b.append("/");
|
||||
b.append(theEntity.getIdDt().getIdPart());
|
||||
b.append(" (pid ");
|
||||
|
@ -269,7 +258,7 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
|
||||
} else {
|
||||
|
||||
retVal = (R) myContext.getResourceDefinition(theEntity.getResourceType()).newInstance();
|
||||
retVal = (R) myFhirContext.getResourceDefinition(theEntity.getResourceType()).newInstance();
|
||||
|
||||
}
|
||||
return retVal;
|
||||
|
@ -279,12 +268,12 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
private <R extends IBaseResource> Class<R> determineTypeToParse(Class<R> theResourceType, @Nullable Collection<? extends BaseTag> tagList) {
|
||||
Class<R> resourceType = theResourceType;
|
||||
if (tagList != null) {
|
||||
if (myContext.hasDefaultTypeForProfile()) {
|
||||
if (myFhirContext.hasDefaultTypeForProfile()) {
|
||||
for (BaseTag nextTag : tagList) {
|
||||
if (nextTag.getTag().getTagType() == TagTypeEnum.PROFILE) {
|
||||
String profile = nextTag.getTag().getCode();
|
||||
if (isNotBlank(profile)) {
|
||||
Class<? extends IBaseResource> newType = myContext.getDefaultTypeForProfile(profile);
|
||||
Class<? extends IBaseResource> newType = myFhirContext.getDefaultTypeForProfile(profile);
|
||||
if (newType != null && theResourceType.isAssignableFrom(newType)) {
|
||||
ourLog.debug("Using custom type {} for profile: {}", newType.getName(), profile);
|
||||
resourceType = (Class<R>) newType;
|
||||
|
@ -315,7 +304,7 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
private <R extends IResource> R populateResourceMetadataHapi(IBaseResourceEntity theEntity, @Nullable Collection<? extends BaseTag> theTagList, boolean theForHistoryOperation, R res, Long theVersion) {
|
||||
R retVal = res;
|
||||
if (theEntity.getDeleted() != null) {
|
||||
res = (R) myContext.getResourceDefinition(res).newInstance();
|
||||
res = (R) myFhirContext.getResourceDefinition(res).newInstance();
|
||||
retVal = res;
|
||||
ResourceMetadataKeyEnum.DELETED_AT.put(res, new InstantDt(theEntity.getDeleted()));
|
||||
if (theForHistoryOperation) {
|
||||
|
@ -352,7 +341,7 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
profiles.add(new IdDt(next.getTag().getCode()));
|
||||
break;
|
||||
case SECURITY_LABEL:
|
||||
IBaseCoding secLabel = (IBaseCoding) myContext.getVersion().newCodingDt();
|
||||
IBaseCoding secLabel = (IBaseCoding) myFhirContext.getVersion().newCodingDt();
|
||||
secLabel.setSystem(next.getTag().getSystem());
|
||||
secLabel.setCode(next.getTag().getCode());
|
||||
secLabel.setDisplay(next.getTag().getDisplay());
|
||||
|
@ -382,7 +371,7 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
private <R extends IBaseResource> R populateResourceMetadataRi(IBaseResourceEntity theEntity, @Nullable Collection<? extends BaseTag> theTagList, boolean theForHistoryOperation, IAnyResource res, Long theVersion) {
|
||||
R retVal = (R) res;
|
||||
if (theEntity.getDeleted() != null) {
|
||||
res = (IAnyResource) myContext.getResourceDefinition(res).newInstance();
|
||||
res = (IAnyResource) myFhirContext.getResourceDefinition(res).newInstance();
|
||||
retVal = (R) res;
|
||||
ResourceMetadataKeyEnum.DELETED_AT.put(res, new InstantDt(theEntity.getDeleted()));
|
||||
if (theForHistoryOperation) {
|
||||
|
@ -441,8 +430,8 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
@Override
|
||||
public void updateResourceMetadata(IBaseResourceEntity theEntitySource, IBaseResource theResourceTarget) {
|
||||
IIdType id = theEntitySource.getIdDt();
|
||||
if (myContext.getVersion().getVersion().isRi()) {
|
||||
id = myContext.getVersion().newIdType().setValue(id.getValue());
|
||||
if (myFhirContext.getVersion().getVersion().isRi()) {
|
||||
id = myFhirContext.getVersion().newIdType().setValue(id.getValue());
|
||||
}
|
||||
|
||||
if (id.hasResourceType() == false) {
|
||||
|
@ -462,8 +451,8 @@ public class JpaStorageResourceParser implements IJpaStorageResourceParser {
|
|||
|
||||
private FhirContext getContext(FhirVersionEnum theVersion) {
|
||||
Validate.notNull(theVersion, "theVersion must not be null");
|
||||
if (theVersion == myContext.getVersion().getVersion()) {
|
||||
return myContext;
|
||||
if (theVersion == myFhirContext.getVersion().getVersion()) {
|
||||
return myFhirContext;
|
||||
}
|
||||
return FhirContext.forCached(theVersion);
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
|
|||
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.r4.model.Bundle;
|
||||
import org.hl7.fhir.r4.model.IdType;
|
||||
import org.hl7.fhir.r4.model.Patient;
|
||||
|
|
|
@ -5,6 +5,7 @@ import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
|||
import ca.uhn.fhir.jpa.util.MemoryCacheService;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import ca.uhn.fhir.util.AsyncUtil;
|
||||
import ca.uhn.fhir.util.MetaUtil;
|
||||
import ca.uhn.fhir.util.ThreadPoolUtil;
|
||||
import ch.qos.logback.classic.Level;
|
||||
import ch.qos.logback.classic.Logger;
|
||||
|
@ -387,9 +388,10 @@ public class BaseHapiFhirDaoTest {
|
|||
|
||||
@Test
|
||||
public void cleanProvenanceSourceUri() {
|
||||
assertEquals("", BaseHapiFhirDao.cleanProvenanceSourceUri(null));
|
||||
assertEquals("abc", BaseHapiFhirDao.cleanProvenanceSourceUri("abc"));
|
||||
assertEquals("abc", BaseHapiFhirDao.cleanProvenanceSourceUri("abc#def"));
|
||||
assertEquals("abc", BaseHapiFhirDao.cleanProvenanceSourceUri("abc#def#ghi"));
|
||||
assertEquals("", MetaUtil.cleanProvenanceSourceUriOrEmpty(null));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc"));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#"));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def"));
|
||||
assertEquals("abc", MetaUtil.cleanProvenanceSourceUriOrEmpty("abc#def#ghi"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,10 +7,12 @@ import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
|
|||
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
|
||||
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
|
||||
import ca.uhn.fhir.rest.api.MethodOutcome;
|
||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||
import ca.uhn.fhir.test.utilities.server.HashMapResourceProviderExtension;
|
||||
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
|
||||
import ca.uhn.fhir.test.utilities.server.TransactionCapturingProviderExtension;
|
||||
import ca.uhn.fhir.util.BundleUtil;
|
||||
import com.apicatalog.jsonld.StringUtils;
|
||||
import net.ttddyy.dsproxy.QueryCount;
|
||||
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
|
@ -127,20 +129,26 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
|
|||
if (theExtension != null) {
|
||||
subscription.getChannel().addExtension(theExtension);
|
||||
}
|
||||
|
||||
MethodOutcome methodOutcome;
|
||||
if (id != null) {
|
||||
subscription.setId(id);
|
||||
methodOutcome = myClient.update().resource(subscription).execute();
|
||||
} else {
|
||||
methodOutcome = myClient.create().resource(subscription).execute();
|
||||
}
|
||||
subscription.setId(methodOutcome.getId().getIdPart());
|
||||
mySubscriptionIds.add(methodOutcome.getId());
|
||||
|
||||
subscription = postOrPutSubscription(subscription);
|
||||
return subscription;
|
||||
}
|
||||
|
||||
protected Subscription postOrPutSubscription(IBaseResource theSubscription) {
|
||||
MethodOutcome methodOutcome;
|
||||
if (theSubscription.getIdElement().isEmpty()) {
|
||||
methodOutcome = myClient.create().resource(theSubscription).execute();
|
||||
} else {
|
||||
methodOutcome = myClient.update().resource(theSubscription).execute();
|
||||
}
|
||||
theSubscription.setId(methodOutcome.getId().getIdPart());
|
||||
mySubscriptionIds.add(methodOutcome.getId());
|
||||
return (Subscription) theSubscription;
|
||||
}
|
||||
|
||||
protected Subscription newSubscription(String theCriteria, String thePayload) {
|
||||
Subscription subscription = new Subscription();
|
||||
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
|
||||
|
@ -166,11 +174,21 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
|
|||
|
||||
|
||||
protected Observation sendObservation(String theCode, String theSystem) {
|
||||
return sendObservation(theCode, theSystem, null, null);
|
||||
}
|
||||
|
||||
protected Observation sendObservation(String theCode, String theSystem, String theSource, String theRequestId) {
|
||||
Observation observation = createBaseObservation(theCode, theSystem);
|
||||
if (StringUtils.isNotBlank(theSource)) {
|
||||
observation.getMeta().setSource(theSource);
|
||||
}
|
||||
|
||||
IIdType id = myObservationDao.create(observation).getId();
|
||||
SystemRequestDetails systemRequestDetails = new SystemRequestDetails();
|
||||
if (StringUtils.isNotBlank(theRequestId)) {
|
||||
systemRequestDetails.setRequestId(theRequestId);
|
||||
}
|
||||
IIdType id = myObservationDao.create(observation, systemRequestDetails).getId();
|
||||
observation.setId(id);
|
||||
|
||||
return observation;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
package ca.uhn.fhir.jpa.subscription.message;
|
||||
|
||||
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
|
||||
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
|
||||
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
|
||||
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
|
||||
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
|
||||
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.r4.model.CodeableConcept;
|
||||
import org.hl7.fhir.r4.model.Coding;
|
||||
import org.hl7.fhir.r4.model.Observation;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
* Test the rest-hook subscriptions
|
||||
*/
|
||||
public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
|
||||
@Autowired
|
||||
private SubscriptionChannelFactory myChannelFactory ;
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(MessageSubscriptionR4Test.class);
|
||||
private TestQueueConsumerHandler<ResourceModifiedJsonMessage> handler;
|
||||
|
||||
@Autowired
|
||||
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
|
||||
|
||||
@AfterEach
|
||||
public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() {
|
||||
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null);
|
||||
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
|
||||
myStorageSettings.setTriggerSubscriptionsForNonVersioningChanges(new JpaStorageSettings().isTriggerSubscriptionsForNonVersioningChanges());
|
||||
}
|
||||
@BeforeEach
|
||||
public void beforeRegisterRestHookListener() {
|
||||
mySubscriptionTestUtil.registerMessageInterceptor();
|
||||
|
||||
IChannelReceiver receiver = myChannelFactory.newMatchingReceivingChannel("my-queue-name", new ChannelConsumerSettings());
|
||||
handler = new TestQueueConsumerHandler();
|
||||
receiver.subscribe(handler);
|
||||
}
|
||||
|
||||
private Subscription createObservationSubscription() {
|
||||
Subscription subscription = new Subscription();
|
||||
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
|
||||
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
|
||||
subscription.setCriteria("[Observation]");
|
||||
|
||||
Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
|
||||
channel.setType(Subscription.SubscriptionChannelType.MESSAGE);
|
||||
channel.setPayload("application/fhir+json");
|
||||
channel.setEndpoint("channel:my-queue-name");
|
||||
|
||||
subscription.setChannel(channel);
|
||||
postOrPutSubscription(subscription);
|
||||
return subscription;
|
||||
}
|
||||
|
||||
private static Stream<Arguments> sourceTypes() {
|
||||
return Stream.of(
|
||||
Arguments.of(JpaStorageSettings.StoreMetaSourceInformationEnum.SOURCE_URI_AND_REQUEST_ID, "explicit-source", null, "explicit-source"),
|
||||
Arguments.of(JpaStorageSettings.StoreMetaSourceInformationEnum.REQUEST_ID, null, null, null),
|
||||
Arguments.of(JpaStorageSettings.StoreMetaSourceInformationEnum.SOURCE_URI, "explicit-source", "request-id", "explicit-source"),
|
||||
Arguments.of(JpaStorageSettings.StoreMetaSourceInformationEnum.SOURCE_URI_AND_REQUEST_ID, "explicit-source", "request-id", "explicit-source#request-id"),
|
||||
Arguments.of(JpaStorageSettings.StoreMetaSourceInformationEnum.SOURCE_URI, "explicit-source", null, "explicit-source"),
|
||||
Arguments.of(JpaStorageSettings.StoreMetaSourceInformationEnum.SOURCE_URI_AND_REQUEST_ID, null, "request-id", "#request-id"),
|
||||
Arguments.of(JpaStorageSettings.StoreMetaSourceInformationEnum.REQUEST_ID, "explicit-source", "request-id", "#request-id")
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("sourceTypes")
|
||||
public void testCreateUpdateAndPatchRetainCorrectSourceThroughDelivery(JpaStorageSettings.StoreMetaSourceInformationEnum theStorageStyle, String theExplicitSource, String theRequestId, String theExpectedSourceValue) throws Exception {
|
||||
myStorageSettings.setStoreMetaSourceInformation(theStorageStyle);
|
||||
createObservationSubscription();
|
||||
|
||||
waitForActivatedSubscriptionCount(1);
|
||||
|
||||
Observation obs = sendObservation("zoop", "SNOMED-CT", theExplicitSource, theRequestId);
|
||||
|
||||
//Quick validation source stored.
|
||||
Observation readObs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
|
||||
assertThat(readObs.getMeta().getSource(), is(equalTo(theExpectedSourceValue)));
|
||||
|
||||
// Should see 1 subscription notification
|
||||
waitForQueueToDrain();
|
||||
|
||||
//Should receive at our queue receiver
|
||||
Observation receivedObs = fetchSingleObservationFromSubscriptionTerminalEndpoint();
|
||||
assertThat(receivedObs.getMeta().getSource(), is(equalTo(theExpectedSourceValue)));
|
||||
}
|
||||
|
||||
private Observation fetchSingleObservationFromSubscriptionTerminalEndpoint() {
|
||||
assertThat(handler.getMessages().size(), is(equalTo(1)));
|
||||
ResourceModifiedJsonMessage resourceModifiedJsonMessage = handler.getMessages().get(0);
|
||||
ResourceModifiedMessage payload = resourceModifiedJsonMessage.getPayload();
|
||||
String payloadString = payload.getPayloadString();
|
||||
IBaseResource resource = myFhirContext.newJsonParser().parseResource(payloadString);
|
||||
Observation receivedObs = (Observation) resource;
|
||||
handler.clearMessages();
|
||||
return receivedObs;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package ca.uhn.fhir.jpa.subscription.message;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.slf4j.LoggerFactory.getLogger;
|
||||
|
||||
public class TestQueueConsumerHandler<T> implements MessageHandler {
|
||||
private static final Logger ourLog = getLogger(TestQueueConsumerHandler.class);
|
||||
List<T> myMessages;
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
getMessages().add((T)message);
|
||||
ourLog.info("Received message: {}", message);
|
||||
}
|
||||
public void clearMessages() {
|
||||
myMessages.clear();;
|
||||
}
|
||||
|
||||
public List<T> getMessages() {
|
||||
if (myMessages == null) {
|
||||
myMessages = new ArrayList<>();
|
||||
}
|
||||
return myMessages;
|
||||
}
|
||||
}
|
|
@ -135,7 +135,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
|
|||
* Send version 1
|
||||
*/
|
||||
|
||||
Observation obs = sendObservation(code, "SNOMED-CT");
|
||||
Observation obs = sendObservation(code, "SNOMED-CT", "http://source-system.com", null);
|
||||
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
|
||||
|
||||
// Should see 1 subscription notification
|
||||
|
@ -145,6 +145,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
|
|||
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
|
||||
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
|
||||
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
|
||||
assertEquals("http://source-system.com", ourObservationProvider.getStoredResources().get(0).getMeta().getSource());
|
||||
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
|
||||
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
|
||||
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdentifierFirstRep().getValue());
|
||||
|
@ -154,6 +155,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
|
|||
*/
|
||||
|
||||
obs.getIdentifierFirstRep().setSystem("foo").setValue("2");
|
||||
obs.getMeta().setSource("http://other-source");
|
||||
myObservationDao.update(obs);
|
||||
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
|
||||
|
||||
|
@ -164,6 +166,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
|
|||
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
|
||||
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
|
||||
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
|
||||
assertEquals("http://other-source", ourObservationProvider.getStoredResources().get(0).getMeta().getSource());
|
||||
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
|
||||
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
|
||||
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdentifierFirstRep().getValue());
|
||||
|
|
|
@ -81,6 +81,11 @@ public class SubscriptionTestUtil {
|
|||
mySubscriptionSubmitInterceptorLoader.start();
|
||||
}
|
||||
|
||||
public void registerMessageInterceptor() {
|
||||
myStorageSettings.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.MESSAGE);
|
||||
mySubscriptionSubmitInterceptorLoader.start();
|
||||
}
|
||||
|
||||
public void registerWebSocketInterceptor() {
|
||||
myStorageSettings.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.WEBSOCKET);
|
||||
mySubscriptionSubmitInterceptorLoader.start();
|
||||
|
|
Loading…
Reference in New Issue