Fix the issue of missing meta.tags in subscription message (#4576)

* Fix + Test

* Addressing comments

* Minor fix
This commit is contained in:
Qingyixia 2023-02-23 18:07:10 -05:00 committed by GitHub
parent 7827f65f2d
commit 3554e9cf91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 12 deletions

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 4575
title: "Previously, the full set of meta tags is not present in the payload of the subscription message when UPDATE the resource. Now, this issue has been fixed."

View File

@ -770,6 +770,14 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
}); });
// Update the resource to contain the old tags
allTagsOld.forEach(tag -> {
theResource.getMeta()
.addTag()
.setCode(tag.getTag().getCode())
.setSystem(tag.getTag().getSystem());
});
theEntity.setHasTags(!allTagsNew.isEmpty()); theEntity.setHasTags(!allTagsNew.isEmpty());
return !allTagsOld.equals(allTagsNew); return !allTagsOld.equals(allTagsNew);
} }

View File

@ -1,6 +1,8 @@
package ca.uhn.fhir.jpa.subscription.message; package ca.uhn.fhir.jpa.subscription.message;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.interceptor.CascadingDeleteInterceptor;
import ca.uhn.fhir.jpa.model.entity.NormalizedQuantitySearchLevel;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings; import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
@ -8,11 +10,11 @@ import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFact
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber; import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.CodeableConcept; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Coding; import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -20,18 +22,19 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
* Test the rest-hook subscriptions * Test the rest-hook subscriptions
@ -50,7 +53,9 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null); myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null);
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause(); myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
myStorageSettings.setTriggerSubscriptionsForNonVersioningChanges(new JpaStorageSettings().isTriggerSubscriptionsForNonVersioningChanges()); myStorageSettings.setTriggerSubscriptionsForNonVersioningChanges(new JpaStorageSettings().isTriggerSubscriptionsForNonVersioningChanges());
myStorageSettings.setTagStorageMode(new JpaStorageSettings().getTagStorageMode());
} }
@BeforeEach @BeforeEach
public void beforeRegisterRestHookListener() { public void beforeRegisterRestHookListener() {
mySubscriptionTestUtil.registerMessageInterceptor(); mySubscriptionTestUtil.registerMessageInterceptor();
@ -60,11 +65,11 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
receiver.subscribe(handler); receiver.subscribe(handler);
} }
private Subscription createObservationSubscription() { private Subscription createSubscriptionWithCriteria(String theCriteria) {
Subscription subscription = new Subscription(); Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)"); subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED); subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
subscription.setCriteria("[Observation]"); subscription.setCriteria(theCriteria);
Subscription.SubscriptionChannelComponent channel = subscription.getChannel(); Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
channel.setType(Subscription.SubscriptionChannelType.MESSAGE); channel.setType(Subscription.SubscriptionChannelType.MESSAGE);
@ -92,7 +97,7 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
@MethodSource("sourceTypes") @MethodSource("sourceTypes")
public void testCreateUpdateAndPatchRetainCorrectSourceThroughDelivery(JpaStorageSettings.StoreMetaSourceInformationEnum theStorageStyle, String theExplicitSource, String theRequestId, String theExpectedSourceValue) throws Exception { public void testCreateUpdateAndPatchRetainCorrectSourceThroughDelivery(JpaStorageSettings.StoreMetaSourceInformationEnum theStorageStyle, String theExplicitSource, String theRequestId, String theExpectedSourceValue) throws Exception {
myStorageSettings.setStoreMetaSourceInformation(theStorageStyle); myStorageSettings.setStoreMetaSourceInformation(theStorageStyle);
createObservationSubscription(); createSubscriptionWithCriteria("[Observation]");
waitForActivatedSubscriptionCount(1); waitForActivatedSubscriptionCount(1);
@ -106,19 +111,64 @@ public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain(); waitForQueueToDrain();
//Should receive at our queue receiver //Should receive at our queue receiver
Observation receivedObs = fetchSingleObservationFromSubscriptionTerminalEndpoint(); IBaseResource resource = fetchSingleResourceFromSubscriptionTerminalEndpoint();
assertThat(resource, instanceOf(Observation.class));
Observation receivedObs = (Observation) resource;
assertThat(receivedObs.getMeta().getSource(), is(equalTo(theExpectedSourceValue))); assertThat(receivedObs.getMeta().getSource(), is(equalTo(theExpectedSourceValue)));
} }
private Observation fetchSingleObservationFromSubscriptionTerminalEndpoint() { @Test
public void testUpdateResourceRetainCorrectMetaTagsThroughDelivery() throws Exception {
myStorageSettings.setTagStorageMode(JpaStorageSettings.TagStorageModeEnum.NON_VERSIONED);
createSubscriptionWithCriteria("[Patient]");
waitForActivatedSubscriptionCount(1);
// Create Patient with two meta tags
Patient patient = new Patient();
patient.setActive(true);
patient.getMeta().addTag().setSystem("http://www.example.com/tags").setCode("tag-1");
patient.getMeta().addTag().setSystem("http://www.example.com/tags").setCode("tag-2");
IIdType id = myClient.create().resource(patient).execute().getId();
// Should see 1 subscription notification for CREATE
waitForQueueToDrain();
// Should receive two meta tags
IBaseResource resource = fetchSingleResourceFromSubscriptionTerminalEndpoint();
assertThat(resource, instanceOf(Patient.class));
Patient receivedPatient = (Patient) resource;
assertThat(receivedPatient.getMeta().getTag().size(), is(equalTo(2)));
// Update the previous Patient and add one more tag
patient = new Patient();
patient.setId(id);
patient.setActive(true);
patient.getMeta().getTag().add(new Coding().setSystem("http://www.example.com/tags").setCode("tag-3"));
myClient.update().resource(patient).execute();
waitForQueueToDrain();
// Should receive all three meta tags
List<String> expected = List.of("tag-1", "tag-2", "tag-3");
resource = fetchSingleResourceFromSubscriptionTerminalEndpoint();
receivedPatient = (Patient) resource;
List<Coding> receivedTagList = receivedPatient.getMeta().getTag();
ourLog.info(getFhirContext().newJsonParser().setPrettyPrint(true).encodeResourceToString(receivedPatient));
assertThat(receivedTagList.size(), is(equalTo(3)));
List<String> actual = receivedTagList.stream().map(t -> t.getCode()).sorted().collect(Collectors.toList());
assertTrue(expected.equals(actual));
}
private IBaseResource fetchSingleResourceFromSubscriptionTerminalEndpoint() {
assertThat(handler.getMessages().size(), is(equalTo(1))); assertThat(handler.getMessages().size(), is(equalTo(1)));
ResourceModifiedJsonMessage resourceModifiedJsonMessage = handler.getMessages().get(0); ResourceModifiedJsonMessage resourceModifiedJsonMessage = handler.getMessages().get(0);
ResourceModifiedMessage payload = resourceModifiedJsonMessage.getPayload(); ResourceModifiedMessage payload = resourceModifiedJsonMessage.getPayload();
String payloadString = payload.getPayloadString(); String payloadString = payload.getPayloadString();
IBaseResource resource = myFhirContext.newJsonParser().parseResource(payloadString); IBaseResource resource = myFhirContext.newJsonParser().parseResource(payloadString);
Observation receivedObs = (Observation) resource;
handler.clearMessages(); handler.clearMessages();
return receivedObs; return resource;
} }
} }