Revert changes, add new test clas, new test consumer for queus

This commit is contained in:
Tadgh 2023-02-09 23:36:36 -08:00
parent c056aebcf6
commit 2e9209aca0
6 changed files with 190 additions and 17 deletions

View File

@ -60,18 +60,14 @@ public class SubscriptionDeliveringMessageSubscriber extends BaseSubscriptionDel
protected void doDelivery(ResourceDeliveryMessage theSourceMessage, CanonicalSubscription theSubscription, IChannelProducer theChannelProducer, ResourceModifiedJsonMessage theWrappedMessageToSend) {
String payloadId = theSourceMessage.getPayloadId();
IBaseResource payloadResource = null;
if (isNotBlank(theSubscription.getPayloadSearchCriteria())) {
payloadResource = createDeliveryBundleForPayloadSearchCriteria(theSubscription, theWrappedMessageToSend.getPayload().getPayload(myFhirContext));
} else if (! theWrappedMessageToSend.getPayload().getPayloadString().contains(HapiExtensions.EXT_META_SOURCE)){
payloadResource = updateDeliveryResourceWithMetaSource(theWrappedMessageToSend.getPayload().getPayload(myFhirContext));
}
if (payloadResource != null) {
if (isNotBlank(theSubscription.getPayloadSearchCriteria())) {
IBaseResource payloadResource = createDeliveryBundleForPayloadSearchCriteria(theSubscription, theWrappedMessageToSend.getPayload().getPayload(myFhirContext));
ResourceModifiedJsonMessage newWrappedMessageToSend = convertDeliveryMessageToResourceModifiedMessage(theSourceMessage, payloadResource);
theWrappedMessageToSend.setPayload(newWrappedMessageToSend.getPayload());
payloadId = payloadResource.getIdElement().toUnqualifiedVersionless().getValue();
}
theChannelProducer.send(theWrappedMessageToSend);
ourLog.debug("Delivering {} message payload {} for {}", theSourceMessage.getOperationType(), payloadId, theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
}

View File

@ -11,6 +11,7 @@ import ca.uhn.fhir.test.utilities.server.HashMapResourceProviderExtension;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import ca.uhn.fhir.test.utilities.server.TransactionCapturingProviderExtension;
import ca.uhn.fhir.util.BundleUtil;
import com.apicatalog.jsonld.StringUtils;
import net.ttddyy.dsproxy.QueryCount;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -127,19 +128,27 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
if (theExtension != null) {
subscription.getChannel().addExtension(theExtension);
}
MethodOutcome methodOutcome;
if (id != null) {
subscription.setId(id);
methodOutcome = myClient.update().resource(subscription).execute();
} else {
methodOutcome = myClient.create().resource(subscription).execute();
}
subscription.setId(methodOutcome.getId().getIdPart());
mySubscriptionIds.add(methodOutcome.getId());
subscription = postOrPutSubscription(subscription);
mySubscriptionIds.add(subscription.getIdElement());
return subscription;
}
protected Subscription postOrPutSubscription(IBaseResource theSubscription) {
MethodOutcome methodOutcome;
if (theSubscription.getIdElement().isEmpty()) {
methodOutcome = myClient.create().resource(theSubscription).execute();
} else {
methodOutcome = myClient.update().resource(theSubscription).execute();
}
theSubscription.setId(methodOutcome.getId().getIdPart());
return (Subscription) theSubscription;
}
protected Subscription newSubscription(String theCriteria, String thePayload) {
Subscription subscription = new Subscription();
@ -166,11 +175,16 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
protected Observation sendObservation(String theCode, String theSystem) {
Observation observation = createBaseObservation(theCode, theSystem);
return sendObservation(theCode, theSystem, null);
}
protected Observation sendObservation(String theCode, String theSystem, String theSource) {
Observation observation = createBaseObservation(theCode, theSystem);
if (!StringUtils.isBlank(theSource)) {
observation.getMeta().setSource(theSource);
}
IIdType id = myObservationDao.create(observation).getId();
observation.setId(id);
return observation;
}

View File

@ -0,0 +1,124 @@
package ca.uhn.fhir.jpa.subscription.message;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR4Test;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import ca.uhn.fhir.jpa.subscription.channel.subscription.SubscriptionChannelFactory;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.test.util.StoppableSubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
* Test the rest-hook subscriptions
*/
public class MessageSubscriptionR4Test extends BaseSubscriptionsR4Test {
@Autowired
private SubscriptionChannelFactory myChannelFactory ;
private static final Logger ourLog = LoggerFactory.getLogger(MessageSubscriptionR4Test.class);
private TestQueueConsumerHandler<ResourceModifiedJsonMessage> handler;
@Autowired
StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber;
@AfterEach
public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() {
myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null);
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
myDaoConfig.setTriggerSubscriptionsForNonVersioningChanges(new DaoConfig().isTriggerSubscriptionsForNonVersioningChanges());
}
@BeforeEach
public void beforeRegisterRestHookListener() {
mySubscriptionTestUtil.registerMessageInterceptor();
IChannelReceiver receiver = myChannelFactory.newMatchingReceivingChannel("my-queue-name", new ChannelConsumerSettings());
handler = new TestQueueConsumerHandler();
receiver.subscribe(handler);
}
private Subscription createObservationSubscription() {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
subscription.setCriteria("[Observation]");
Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
channel.setType(Subscription.SubscriptionChannelType.MESSAGE);
channel.setPayload("application/fhir+json");
channel.setEndpoint("channel:my-queue-name");
subscription.setChannel(channel);
postOrPutSubscription(subscription);
return subscription;
}
@ParameterizedTest
@ValueSource(strings = {"[*]", "Observation?code=zoop", "[Observation]"})
public void testCreateUpdateAndPatchRetainCorrectSourceThroughDelivery() throws Exception {
createObservationSubscription();
waitForActivatedSubscriptionCount(1);
String source = "http://some-random-system.com";
Observation obs = sendObservation("zoop", "SNOMED-CT", source);
//Quick validation source stored.
Observation readObs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
assertThat(readObs.getMeta().getSource(), is(equalTo(source)));
// Should see 1 subscription notification
waitForQueueToDrain();
//Should receive at our queue receiver
Observation receivedObs = fetchSingleObservationFromSubscriptionTerminalEndpoint();
assertThat(receivedObs.getMeta().getSource(), is(equalTo(source)));
//Then when we update the resource with a new source
String newSource = "http://some-new-source";
receivedObs.getMeta().setSource(newSource);
Coding coding = new CodeableConcept().addCoding().setSystem("system").setCode("zoop");
receivedObs.setCode(new CodeableConcept().addCoding(coding));
myObservationDao.update(receivedObs, new SystemRequestDetails());
//We should see that source reflected in our subscription.
waitForQueueToDrain();
await().until(() -> handler.getMessages().size() == 1);
receivedObs = fetchSingleObservationFromSubscriptionTerminalEndpoint();
assertThat(receivedObs.getMeta().getSource(), is(equalTo(newSource)));
}
private Observation fetchSingleObservationFromSubscriptionTerminalEndpoint() {
assertThat(handler.getMessages().size(), is(equalTo(1)));
ResourceModifiedJsonMessage resourceModifiedJsonMessage = handler.getMessages().get(0);
ResourceModifiedMessage payload = resourceModifiedJsonMessage.getPayload();
String payloadString = payload.getPayloadString();
IBaseResource resource = myFhirContext.newJsonParser().parseResource(payloadString);
Observation receivedObs = (Observation) resource;
handler.clearMessages();
return receivedObs;
}
}

View File

@ -0,0 +1,31 @@
package ca.uhn.fhir.jpa.subscription.message;
import org.slf4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.ArrayList;
import java.util.List;
import static org.slf4j.LoggerFactory.getLogger;
public class TestQueueConsumerHandler<T> implements MessageHandler {
private static final Logger ourLog = getLogger(TestQueueConsumerHandler.class);
List<T> myMessages;
@Override
public void handleMessage(Message<?> message) throws MessagingException {
getMessages().add((T)message);
ourLog.info("Received message: {}", message);
}
public void clearMessages() {
myMessages.clear();;
}
public List<T> getMessages() {
if (myMessages == null) {
myMessages = new ArrayList<>();
}
return myMessages;
}
}

View File

@ -135,7 +135,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
* Send version 1
*/
Observation obs = sendObservation(code, "SNOMED-CT");
Observation obs = sendObservation(code, "SNOMED-CT", "http://source-system.com");
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
// Should see 1 subscription notification
@ -145,6 +145,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals("http://source-system.com", ourObservationProvider.getStoredResources().get(0).getMeta().getSource());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdentifierFirstRep().getValue());
@ -154,6 +155,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
*/
obs.getIdentifierFirstRep().setSystem("foo").setValue("2");
obs.getMeta().setSource("http://other-source");
myObservationDao.update(obs);
obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless());
@ -164,6 +166,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals("http://other-source", ourObservationProvider.getStoredResources().get(0).getMeta().getSource());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdentifierFirstRep().getValue());

View File

@ -81,6 +81,11 @@ public class SubscriptionTestUtil {
mySubscriptionSubmitInterceptorLoader.start();
}
public void registerMessageInterceptor() {
myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.MESSAGE);
mySubscriptionSubmitInterceptorLoader.start();
}
public void registerWebSocketInterceptor() {
myDaoConfig.addSupportedSubscriptionType(Subscription.SubscriptionChannelType.WEBSOCKET);
mySubscriptionSubmitInterceptorLoader.start();