From 2999a292e6e053dbdd1d518bd61b67a6aba4b177 Mon Sep 17 00:00:00 2001 From: jamesagnew Date: Sun, 11 Aug 2019 18:32:47 -0400 Subject: [PATCH] Get subscriptions working for R5 --- .../subscription/BaseSubscriptionsR5Test.java | 240 +++++ .../resthook/RestHookTestR5Test.java | 957 ++++++++++++++++++ .../cache/SubscriptionCanonicalizer.java | 64 ++ 3 files changed, 1261 insertions(+) create mode 100644 hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR5Test.java create mode 100644 hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR5Test.java diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR5Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR5Test.java new file mode 100644 index 00000000000..ce7b4ce0bec --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/BaseSubscriptionsR5Test.java @@ -0,0 +1,240 @@ +package ca.uhn.fhir.jpa.subscription; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.dao.DaoConfig; +import ca.uhn.fhir.jpa.provider.r5.BaseResourceProviderR5Test; +import ca.uhn.fhir.jpa.subscription.module.LinkedBlockingQueueSubscribableChannel; +import ca.uhn.fhir.rest.annotation.Create; +import ca.uhn.fhir.rest.annotation.ResourceParam; +import ca.uhn.fhir.rest.annotation.Update; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.api.MethodOutcome; +import ca.uhn.fhir.rest.server.IResourceProvider; +import ca.uhn.fhir.rest.server.RestfulServer; +import ca.uhn.fhir.test.utilities.JettyUtil; +import ca.uhn.fhir.util.BundleUtil; +import com.google.common.collect.Lists; +import net.ttddyy.dsproxy.QueryCount; +import net.ttddyy.dsproxy.listener.SingleQueryCountHolder; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.hl7.fhir.r5.model.*; +import org.junit.*; +import org.springframework.beans.factory.annotation.Autowired; + +import javax.annotation.PostConstruct; +import javax.servlet.http.HttpServletRequest; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.List; + +@Ignore +public abstract class BaseSubscriptionsR5Test extends BaseResourceProviderR5Test { + private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseSubscriptionsR5Test.class); + + private static Server ourListenerServer; + protected static int ourListenerPort; + protected static List ourContentTypes = Collections.synchronizedList(new ArrayList<>()); + protected static List ourHeaders = Collections.synchronizedList(new ArrayList<>()); + private static SingleQueryCountHolder ourCountHolder; + + @Autowired + private SingleQueryCountHolder myCountHolder; + @Autowired + protected SubscriptionTestUtil mySubscriptionTestUtil; + @Autowired + protected SubscriptionMatcherInterceptor mySubscriptionMatcherInterceptor; + + protected CountingInterceptor myCountingInterceptor; + + protected static List ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList()); + protected static List ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList()); + private static String ourListenerServerBase; + + protected List mySubscriptionIds = Collections.synchronizedList(new ArrayList<>()); + + + @After + public void afterUnregisterRestHookListener() { + for (IIdType next : mySubscriptionIds) { + IIdType nextId = next.toUnqualifiedVersionless(); + ourLog.info("Deleting: {}", nextId); + ourClient.delete().resourceById(nextId).execute(); + } + mySubscriptionIds.clear(); + + myDaoConfig.setAllowMultipleDelete(true); + ourLog.info("Deleting all subscriptions"); + ourClient.delete().resourceConditionalByUrl("Subscription?status=active").execute(); + ourClient.delete().resourceConditionalByUrl("Observation?code:missing=false").execute(); + ourLog.info("Done deleting all subscriptions"); + myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete()); + + mySubscriptionTestUtil.unregisterSubscriptionInterceptor(); + } + + @Before + public void beforeRegisterRestHookListener() { + mySubscriptionTestUtil.registerRestHookInterceptor(); + } + + @Before + public void beforeReset() throws Exception { + ourCreatedObservations.clear(); + ourUpdatedObservations.clear(); + ourContentTypes.clear(); + ourHeaders.clear(); + + // Delete all Subscriptions + if (ourClient != null) { + Bundle allSubscriptions = ourClient.search().forResource(Subscription.class).returnBundle(Bundle.class).execute(); + for (IBaseResource next : BundleUtil.toListOfResources(myFhirCtx, allSubscriptions)) { + ourClient.delete().resource(next).execute(); + } + waitForActivatedSubscriptionCount(0); + } + + LinkedBlockingQueueSubscribableChannel processingChannel = mySubscriptionMatcherInterceptor.getProcessingChannelForUnitTest(); + if (processingChannel != null) { + processingChannel.clearInterceptorsForUnitTest(); + } + myCountingInterceptor = new CountingInterceptor(); + if (processingChannel != null) { + processingChannel.addInterceptorForUnitTest(myCountingInterceptor); + } + } + + + protected Subscription createSubscription(String theCriteria, String thePayload) { + Subscription subscription = newSubscription(theCriteria, thePayload); + + MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); + subscription.setId(methodOutcome.getId().getIdPart()); + mySubscriptionIds.add(methodOutcome.getId()); + + return subscription; + } + + protected Subscription newSubscription(String theCriteria, String thePayload) { + Subscription subscription = new Subscription(); + subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)"); + subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED); + subscription.setCriteria(theCriteria); + + Subscription.SubscriptionChannelComponent channel = subscription.getChannel(); + channel.setType(Subscription.SubscriptionChannelType.RESTHOOK); + channel.setPayload(thePayload); + channel.setEndpoint(ourListenerServerBase); + return subscription; + } + + + protected void waitForQueueToDrain() throws InterruptedException { + mySubscriptionTestUtil.waitForQueueToDrain(); + } + + @PostConstruct + public void initializeOurCountHolder() { + ourCountHolder = myCountHolder; + } + + + protected Observation sendObservation(String code, String system) { + Observation observation = new Observation(); + CodeableConcept codeableConcept = new CodeableConcept(); + observation.setCode(codeableConcept); + observation.getIdentifierFirstRep().setSystem("foo").setValue("1"); + Coding coding = codeableConcept.addCoding(); + coding.setCode(code); + coding.setSystem(system); + + observation.setStatus(Observation.ObservationStatus.FINAL); + + IIdType id = myObservationDao.create(observation).getId(); + observation.setId(id); + + return observation; + } + + + + public static class ObservationListener implements IResourceProvider { + + @Create + public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { + ourLog.info("Received Listener Create"); + ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); + ourCreatedObservations.add(theObservation); + extractHeaders(theRequest); + return new MethodOutcome(new IdType("Observation/1"), true); + } + + private void extractHeaders(HttpServletRequest theRequest) { + Enumeration headerNamesEnum = theRequest.getHeaderNames(); + while (headerNamesEnum.hasMoreElements()) { + String nextName = headerNamesEnum.nextElement(); + Enumeration valueEnum = theRequest.getHeaders(nextName); + while (valueEnum.hasMoreElements()) { + String nextValue = valueEnum.nextElement(); + ourHeaders.add(nextName + ": " + nextValue); + } + } + } + + @Override + public Class getResourceType() { + return Observation.class; + } + + @Update + public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) { + ourLog.info("Received Listener Update"); + ourUpdatedObservations.add(theObservation); + ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); + extractHeaders(theRequest); + return new MethodOutcome(new IdType("Observation/1"), false); + } + + } + + @AfterClass + public static void reportTotalSelects() { + ourLog.info("Total database select queries: {}", getQueryCount().getSelect()); + } + + private static QueryCount getQueryCount() { + return ourCountHolder.getQueryCountMap().get(""); + } + + @BeforeClass + public static void startListenerServer() throws Exception { + RestfulServer ourListenerRestServer = new RestfulServer(FhirContext.forR5()); + + ObservationListener obsListener = new ObservationListener(); + ourListenerRestServer.setResourceProviders(obsListener); + + ourListenerServer = new Server(0); + + ServletContextHandler proxyHandler = new ServletContextHandler(); + proxyHandler.setContextPath("/"); + + ServletHolder servletHolder = new ServletHolder(); + servletHolder.setServlet(ourListenerRestServer); + proxyHandler.addServlet(servletHolder, "/fhir/context/*"); + + ourListenerServer.setHandler(proxyHandler); + JettyUtil.startServer(ourListenerServer); + ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer); + ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; + } + + @AfterClass + public static void stopListenerServer() throws Exception { + JettyUtil.closeServer(ourListenerServer); + } + +} diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR5Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR5Test.java new file mode 100644 index 00000000000..1c85ab65dc6 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestR5Test.java @@ -0,0 +1,957 @@ +package ca.uhn.fhir.jpa.subscription.resthook; + +import ca.uhn.fhir.jpa.config.StoppableSubscriptionDeliveringRestHookSubscriber; +import ca.uhn.fhir.jpa.model.util.JpaConstants; +import ca.uhn.fhir.jpa.subscription.BaseSubscriptionsR5Test; +import ca.uhn.fhir.rest.api.CacheControlDirective; +import ca.uhn.fhir.rest.api.Constants; +import ca.uhn.fhir.rest.api.MethodOutcome; +import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; +import org.hl7.fhir.instance.model.api.IBaseBundle; +import org.hl7.fhir.r5.model.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +/** + * Test the rest-hook subscriptions + */ +public class RestHookTestR5Test extends BaseSubscriptionsR5Test { + private static final Logger ourLog = LoggerFactory.getLogger(RestHookTestR5Test.class); + + @Autowired + StoppableSubscriptionDeliveringRestHookSubscriber myStoppableSubscriptionDeliveringRestHookSubscriber; + + @After + public void cleanupStoppableSubscriptionDeliveringRestHookSubscriber() { + ourLog.info("@After"); + myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(null); + myStoppableSubscriptionDeliveringRestHookSubscriber.unPause(); + } + + @Test + public void testRestHookSubscriptionApplicationFhirJson() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + createSubscription(criteria1, payload); + createSubscription(criteria2, payload); + waitForActivatedSubscriptionCount(2); + + sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + } + + @Test + public void testUpdatesHaveCorrectMetadata() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?"; + + createSubscription(criteria1, payload); + waitForActivatedSubscriptionCount(1); + + /* + * Send version 1 + */ + + Observation obs = sendObservation(code, "SNOMED-CT"); + obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless()); + + // Should see 1 subscription notification + waitForQueueToDrain(); + int idx = 0; + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx)); + assertEquals("1", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart()); + assertEquals("1", ourUpdatedObservations.get(idx).getMeta().getVersionId()); + assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString()); + assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString()); + assertEquals("1", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue()); + + /* + * Send version 2 + */ + + obs.getIdentifierFirstRep().setSystem("foo").setValue("2"); + myObservationDao.update(obs); + obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless()); + + // Should see 1 subscription notification + waitForQueueToDrain(); + idx++; + waitForSize(0, ourCreatedObservations); + waitForSize(2, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx)); + assertEquals("2", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart()); + assertEquals("2", ourUpdatedObservations.get(idx).getMeta().getVersionId()); + assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString()); + assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString()); + assertEquals("2", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue()); + } + + @Test + public void testPlaceholderReferencesInTransactionAreResolvedCorrectly() throws Exception { + + String payload = "application/fhir+json"; + String code = "1000000050"; + String criteria1 = "Observation?"; + createSubscription(criteria1, payload); + waitForActivatedSubscriptionCount(1); + + // Create a transaction that should match + Bundle bundle = new Bundle(); + bundle.setType(Bundle.BundleType.TRANSACTION); + + Patient patient = new Patient(); + patient.setId(IdType.newRandomUuid()); + patient.getIdentifierFirstRep().setSystem("foo").setValue("AAA"); + bundle.addEntry().setResource(patient).getRequest().setMethod(Bundle.HTTPVerb.POST).setUrl("Patient"); + + Observation observation = new Observation(); + observation.getIdentifierFirstRep().setSystem("foo").setValue("1"); + observation.getCode().addCoding().setCode(code).setSystem("SNOMED-CT"); + observation.setStatus(Observation.ObservationStatus.FINAL); + observation.getSubject().setReference(patient.getId()); + bundle.addEntry().setResource(observation).getRequest().setMethod(Bundle.HTTPVerb.POST).setUrl("Observation"); + + // Send the transaction + mySystemDao.transaction(null, bundle); + + waitForSize(1, ourUpdatedObservations); + + assertThat(ourUpdatedObservations.get(0).getSubject().getReference(), matchesPattern("Patient/[0-9]+")); + } + + @Test + public void testUpdatesHaveCorrectMetadataUsingTransactions() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?"; + + createSubscription(criteria1, payload); + waitForActivatedSubscriptionCount(1); + + /* + * Send version 1 + */ + + Observation observation = new Observation(); + observation.getIdentifierFirstRep().setSystem("foo").setValue("1"); + observation.getCode().addCoding().setCode(code).setSystem("SNOMED-CT"); + observation.setStatus(Observation.ObservationStatus.FINAL); + Bundle bundle = new Bundle(); + bundle.setType(Bundle.BundleType.TRANSACTION); + bundle.addEntry().setResource(observation).getRequest().setMethod(Bundle.HTTPVerb.POST).setUrl("Observation"); + Bundle responseBundle = mySystemDao.transaction(null, bundle); + + Observation obs = myObservationDao.read(new IdType(responseBundle.getEntry().get(0).getResponse().getLocation())); + + // Should see 1 subscription notification + waitForQueueToDrain(); + int idx = 0; + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx)); + assertEquals("1", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart()); + assertEquals("1", ourUpdatedObservations.get(idx).getMeta().getVersionId()); + assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString()); + assertEquals("1", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue()); + + /* + * Send version 2 + */ + + observation = new Observation(); + observation.setId(obs.getId()); + observation.getIdentifierFirstRep().setSystem("foo").setValue("2"); + observation.getCode().addCoding().setCode(code).setSystem("SNOMED-CT"); + observation.setStatus(Observation.ObservationStatus.FINAL); + bundle = new Bundle(); + bundle.setType(Bundle.BundleType.TRANSACTION); + bundle.addEntry().setResource(observation).getRequest().setMethod(Bundle.HTTPVerb.PUT).setUrl(obs.getIdElement().toUnqualifiedVersionless().getValue()); + mySystemDao.transaction(null, bundle); + obs = myObservationDao.read(obs.getIdElement().toUnqualifiedVersionless()); + + // Should see 1 subscription notification + waitForQueueToDrain(); + idx++; + waitForSize(0, ourCreatedObservations); + waitForSize(2, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx)); + assertEquals("2", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart()); + assertEquals("2", ourUpdatedObservations.get(idx).getMeta().getVersionId()); + assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString()); + assertEquals("2", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue()); + } + + @Test + public void testRepeatedDeliveries() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?"; + + createSubscription(criteria1, payload); + waitForActivatedSubscriptionCount(1); + + for (int i = 0; i < 100; i++) { + Observation observation = new Observation(); + observation.getIdentifierFirstRep().setSystem("foo").setValue("ID" + i); + observation.getCode().addCoding().setCode(code).setSystem("SNOMED-CT"); + observation.setStatus(Observation.ObservationStatus.FINAL); + myObservationDao.create(observation); + } + + waitForSize(100, ourUpdatedObservations); + } + + @Test + public void testActiveSubscriptionShouldntReActivate() throws Exception { + String criteria = "Observation?code=111111111&_format=xml"; + String payload = "application/fhir+json"; + createSubscription(criteria, payload); + + waitForActivatedSubscriptionCount(1); + for (int i = 0; i < 5; i++) { + int changes = this.mySubscriptionLoader.doSyncSubscriptionsForUnitTest(); + assertEquals(0, changes); + } + } + + @Test + public void testRestHookSubscriptionNoopUpdateDoesntTriggerNewDelivery() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + createSubscription(criteria1, payload); + createSubscription(criteria2, payload); + waitForActivatedSubscriptionCount(2); + + Observation obs = sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + + // Send an update with no changes + obs.setId(obs.getIdElement().toUnqualifiedVersionless()); + ourClient.update().resource(obs).execute(); + + // Should be no further deliveries + Thread.sleep(1000); + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + + + } + + @Test + public void testRestHookSubscriptionApplicationJsonDisableVersionIdInDelivery() throws Exception { + String payload = "application/json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + + waitForActivatedSubscriptionCount(0); + Subscription subscription1 = createSubscription(criteria1, payload); + waitForActivatedSubscriptionCount(1); + + + ourLog.info("** About to send observation"); + Observation observation1 = sendObservation(code, "SNOMED-CT"); + + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + + IdType idElement = ourUpdatedObservations.get(0).getIdElement(); + assertEquals(observation1.getIdElement().getIdPart(), idElement.getIdPart()); + // VersionId is present + assertEquals(observation1.getIdElement().getVersionIdPart(), idElement.getVersionIdPart()); + + subscription1 + .getChannel() + .addExtension(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS, new BooleanType("true")); + ourLog.info("** About to update subscription"); + + int modCount = myCountingInterceptor.getSentCount(); + ourClient.update().resource(subscription1).execute(); + waitForSize(modCount + 1, () -> myCountingInterceptor.getSentCount()); + + ourLog.info("** About to send observation"); + Observation observation2 = sendObservation(code, "SNOMED-CT"); + + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(2, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(1)); + + idElement = ourUpdatedObservations.get(1).getIdElement(); + assertEquals(observation2.getIdElement().getIdPart(), idElement.getIdPart()); + // Now VersionId is stripped + assertEquals(null, idElement.getVersionIdPart()); + } + + @Test + public void testRestHookSubscriptionDoesntGetLatestVersionByDefault() throws Exception { + String payload = "application/json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + + waitForActivatedSubscriptionCount(0); + createSubscription(criteria1, payload); + waitForActivatedSubscriptionCount(1); + + myStoppableSubscriptionDeliveringRestHookSubscriber.pause(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(countDownLatch); + + ourLog.info("** About to send observation"); + Observation observation = sendObservation(code, "SNOMED-CT"); + assertEquals("1", observation.getIdElement().getVersionIdPart()); + assertNull(observation.getNoteFirstRep().getText()); + + observation.getNoteFirstRep().setText("changed"); + MethodOutcome methodOutcome = ourClient.update().resource(observation).execute(); + assertEquals("2", methodOutcome.getId().getVersionIdPart()); + assertEquals("changed", observation.getNoteFirstRep().getText()); + + // Wait for our two delivery channel threads to be paused + assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS)); + // Open the floodgates! + myStoppableSubscriptionDeliveringRestHookSubscriber.unPause(); + + + waitForSize(0, ourCreatedObservations); + waitForSize(2, ourUpdatedObservations); + + Observation observation1 = ourUpdatedObservations.get(0); + Observation observation2 = ourUpdatedObservations.get(1); + + assertEquals("1", observation1.getIdElement().getVersionIdPart()); + assertNull(observation1.getNoteFirstRep().getText()); + assertEquals("2", observation2.getIdElement().getVersionIdPart()); + assertEquals("changed", observation2.getNoteFirstRep().getText()); + } + + @Test + public void testRestHookSubscriptionGetsLatestVersionWithFlag() throws Exception { + String payload = "application/json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + + waitForActivatedSubscriptionCount(0); + + Subscription subscription = newSubscription(criteria1, payload); + subscription + .getChannel() + .addExtension(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION, new BooleanType("true")); + ourClient.create().resource(subscription).execute(); + + waitForActivatedSubscriptionCount(1); + + myStoppableSubscriptionDeliveringRestHookSubscriber.pause(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + myStoppableSubscriptionDeliveringRestHookSubscriber.setCountDownLatch(countDownLatch); + + ourLog.info("** About to send observation"); + Observation observation = sendObservation(code, "SNOMED-CT"); + assertEquals("1", observation.getIdElement().getVersionIdPart()); + assertNull(observation.getNoteFirstRep().getText()); + + observation.getNoteFirstRep().setText("changed"); + MethodOutcome methodOutcome = ourClient.update().resource(observation).execute(); + assertEquals("2", methodOutcome.getId().getVersionIdPart()); + assertEquals("changed", observation.getNoteFirstRep().getText()); + + // Wait for our two delivery channel threads to be paused + assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS)); + // Open the floodgates! + myStoppableSubscriptionDeliveringRestHookSubscriber.unPause(); + + + waitForSize(0, ourCreatedObservations); + waitForSize(2, ourUpdatedObservations); + + Observation observation1 = ourUpdatedObservations.get(0); + Observation observation2 = ourUpdatedObservations.get(1); + + assertEquals("2", observation1.getIdElement().getVersionIdPart()); + assertEquals("changed", observation1.getNoteFirstRep().getText()); + assertEquals("2", observation2.getIdElement().getVersionIdPart()); + assertEquals("changed", observation2.getNoteFirstRep().getText()); + } + + @Test + public void testRestHookSubscriptionApplicationJson() throws Exception { + String payload = "application/json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + Subscription subscription1 = createSubscription(criteria1, payload); + Subscription subscription2 = createSubscription(criteria2, payload); + waitForActivatedSubscriptionCount(2); + + Observation observation1 = sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + + assertEquals("1", ourUpdatedObservations.get(0).getIdElement().getVersionIdPart()); + + Subscription subscriptionTemp = ourClient.read(Subscription.class, subscription2.getId()); + Assert.assertNotNull(subscriptionTemp); + + subscriptionTemp.setCriteria(criteria1); + ourClient.update().resource(subscriptionTemp).withId(subscriptionTemp.getIdElement()).execute(); + waitForQueueToDrain(); + + Observation observation2 = sendObservation(code, "SNOMED-CT"); + waitForQueueToDrain(); + + // Should see two subscription notifications + waitForSize(0, ourCreatedObservations); + waitForSize(3, ourUpdatedObservations); + + ourClient.delete().resourceById(new IdType("Subscription/" + subscription2.getId())).execute(); + waitForQueueToDrain(); + + Observation observationTemp3 = sendObservation(code, "SNOMED-CT"); + waitForQueueToDrain(); + + // Should see only one subscription notification + waitForSize(0, ourCreatedObservations); + waitForSize(4, ourUpdatedObservations); + + Observation observation3 = ourClient.read(Observation.class, observationTemp3.getId()); + CodeableConcept codeableConcept = new CodeableConcept(); + observation3.setCode(codeableConcept); + Coding coding = codeableConcept.addCoding(); + coding.setCode(code + "111"); + coding.setSystem("SNOMED-CT"); + ourClient.update().resource(observation3).withId(observation3.getIdElement()).execute(); + + // Should see no subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(4, ourUpdatedObservations); + + Observation observation3a = ourClient.read(Observation.class, observationTemp3.getId()); + + CodeableConcept codeableConcept1 = new CodeableConcept(); + observation3a.setCode(codeableConcept1); + Coding coding1 = codeableConcept1.addCoding(); + coding1.setCode(code); + coding1.setSystem("SNOMED-CT"); + ourClient.update().resource(observation3a).withId(observation3a.getIdElement()).execute(); + + // Should see only one subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(5, ourUpdatedObservations); + + assertFalse(subscription1.getId().equals(subscription2.getId())); + assertFalse(observation1.getId().isEmpty()); + assertFalse(observation2.getId().isEmpty()); + } + + @Test + public void testRestHookSubscriptionApplicationJsonDatabase() throws Exception { + // Same test as above, but now run it using database matching + myDaoConfig.setEnableInMemorySubscriptionMatching(false); + String payload = "application/json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + Subscription subscription1 = createSubscription(criteria1, payload); + Subscription subscription2 = createSubscription(criteria2, payload); + waitForActivatedSubscriptionCount(2); + + Observation observation1 = sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + + assertEquals("1", ourUpdatedObservations.get(0).getIdElement().getVersionIdPart()); + + Subscription subscriptionTemp = ourClient.read(Subscription.class, subscription2.getId()); + Assert.assertNotNull(subscriptionTemp); + + subscriptionTemp.setCriteria(criteria1); + ourClient.update().resource(subscriptionTemp).withId(subscriptionTemp.getIdElement()).execute(); + waitForQueueToDrain(); + + Observation observation2 = sendObservation(code, "SNOMED-CT"); + waitForQueueToDrain(); + + // Should see two subscription notifications + waitForSize(0, ourCreatedObservations); + waitForSize(3, ourUpdatedObservations); + + ourClient.delete().resourceById(new IdType("Subscription/" + subscription2.getId())).execute(); + waitForQueueToDrain(); + + Observation observationTemp3 = sendObservation(code, "SNOMED-CT"); + waitForQueueToDrain(); + + // Should see only one subscription notification + waitForSize(0, ourCreatedObservations); + waitForSize(4, ourUpdatedObservations); + + Observation observation3 = ourClient.read(Observation.class, observationTemp3.getId()); + CodeableConcept codeableConcept = new CodeableConcept(); + observation3.setCode(codeableConcept); + Coding coding = codeableConcept.addCoding(); + coding.setCode(code + "111"); + coding.setSystem("SNOMED-CT"); + ourClient.update().resource(observation3).withId(observation3.getIdElement()).execute(); + + // Should see no subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(4, ourUpdatedObservations); + + Observation observation3a = ourClient.read(Observation.class, observationTemp3.getId()); + + CodeableConcept codeableConcept1 = new CodeableConcept(); + observation3a.setCode(codeableConcept1); + Coding coding1 = codeableConcept1.addCoding(); + coding1.setCode(code); + coding1.setSystem("SNOMED-CT"); + ourClient.update().resource(observation3a).withId(observation3a.getIdElement()).execute(); + + // Should see only one subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(5, ourUpdatedObservations); + + assertFalse(subscription1.getId().equals(subscription2.getId())); + assertFalse(observation1.getId().isEmpty()); + assertFalse(observation2.getId().isEmpty()); + } + + @Test + public void testRestHookSubscriptionApplicationXml() throws Exception { + String payload = "application/xml"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + Subscription subscription1 = createSubscription(criteria1, payload); + Subscription subscription2 = createSubscription(criteria2, payload); + waitForActivatedSubscriptionCount(2); + + ourLog.info("** About to send obervation"); + Observation observation1 = sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); + + Subscription subscriptionTemp = ourClient.read(Subscription.class, subscription2.getId()); + Assert.assertNotNull(subscriptionTemp); + subscriptionTemp.setCriteria(criteria1); + ourClient.update().resource(subscriptionTemp).withId(subscriptionTemp.getIdElement()).execute(); + waitForQueueToDrain(); + + Observation observation2 = sendObservation(code, "SNOMED-CT"); + waitForQueueToDrain(); + + // Should see two subscription notifications + waitForSize(0, ourCreatedObservations); + waitForSize(3, ourUpdatedObservations); + + ourClient.delete().resourceById(new IdType("Subscription/" + subscription2.getId())).execute(); + + Observation observationTemp3 = sendObservation(code, "SNOMED-CT"); + + // Should see only one subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(4, ourUpdatedObservations); + + Observation observation3 = ourClient.read(Observation.class, observationTemp3.getId()); + CodeableConcept codeableConcept = new CodeableConcept(); + observation3.setCode(codeableConcept); + Coding coding = codeableConcept.addCoding(); + coding.setCode(code + "111"); + coding.setSystem("SNOMED-CT"); + ourClient.update().resource(observation3).withId(observation3.getIdElement()).execute(); + + // Should see no subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(4, ourUpdatedObservations); + + Observation observation3a = ourClient.read(Observation.class, observationTemp3.getId()); + + CodeableConcept codeableConcept1 = new CodeableConcept(); + observation3a.setCode(codeableConcept1); + Coding coding1 = codeableConcept1.addCoding(); + coding1.setCode(code); + coding1.setSystem("SNOMED-CT"); + ourClient.update().resource(observation3a).withId(observation3a.getIdElement()).execute(); + + // Should see only one subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(5, ourUpdatedObservations); + + assertFalse(subscription1.getId().equals(subscription2.getId())); + assertFalse(observation1.getId().isEmpty()); + assertFalse(observation2.getId().isEmpty()); + } + + @Test + public void testSubscriptionTriggerViaSubscription() throws Exception { + String payload = "application/xml"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + + createSubscription(criteria1, payload); + waitForActivatedSubscriptionCount(1); + + ourLog.info("** About to send obervation"); + + Observation observation = new Observation(); + observation.addIdentifier().setSystem("foo").setValue("bar1"); + observation.setId(IdType.newRandomUuid().getValue()); + CodeableConcept codeableConcept = new CodeableConcept() + .addCoding(new Coding().setCode(code).setSystem("SNOMED-CT")); + observation.setCode(codeableConcept); + observation.setStatus(Observation.ObservationStatus.FINAL); + + Patient patient = new Patient(); + patient.addIdentifier().setSystem("foo").setValue("bar2"); + patient.setId(IdType.newRandomUuid().getValue()); + patient.setActive(true); + observation.getSubject().setReference(patient.getId()); + + Bundle requestBundle = new Bundle(); + requestBundle.setType(Bundle.BundleType.TRANSACTION); + requestBundle.addEntry() + .setResource(observation) + .setFullUrl(observation.getId()) + .getRequest() + .setUrl("Obervation?identifier=foo|bar1") + .setMethod(Bundle.HTTPVerb.PUT); + requestBundle.addEntry() + .setResource(patient) + .setFullUrl(patient.getId()) + .getRequest() + .setUrl("Patient?identifier=foo|bar2") + .setMethod(Bundle.HTTPVerb.PUT); + ourClient.transaction().withBundle(requestBundle).execute(); + + // Should see 1 subscription notification + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); + + Observation obs = ourUpdatedObservations.get(0); + ourLog.info("Observation content: {}", myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(obs)); + } + + @Test + public void testUpdateSubscriptionToMatchLater() throws Exception { + String payload = "application/xml"; + + String code = "1000000050"; + String criteriaBad = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + ourLog.info("** About to create non-matching subscription"); + + Subscription subscription2 = createSubscription(criteriaBad, payload); + + ourLog.info("** About to send observation that wont match"); + + Observation observation1 = sendObservation(code, "SNOMED-CT"); + + // Criteria didn't match, shouldn't see any updates + waitForQueueToDrain(); + Thread.sleep(1000); + assertEquals(0, ourUpdatedObservations.size()); + + Subscription subscriptionTemp = ourClient.read().resource(Subscription.class).withId(subscription2.getId()).execute(); + Assert.assertNotNull(subscriptionTemp); + String criteriaGood = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + subscriptionTemp.setCriteria(criteriaGood); + ourLog.info("** About to update subscription"); + ourClient.update().resource(subscriptionTemp).withId(subscriptionTemp.getIdElement()).execute(); + waitForQueueToDrain(); + + ourLog.info("** About to send Observation 2"); + Observation observation2 = sendObservation(code, "SNOMED-CT"); + waitForQueueToDrain(); + + // Should see a subscription notification this time + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + + ourClient.delete().resourceById(new IdType("Subscription/" + subscription2.getId())).execute(); + + Observation observationTemp3 = sendObservation(code, "SNOMED-CT"); + + // No more matches + Thread.sleep(1000); + assertEquals(1, ourUpdatedObservations.size()); + } + + @Test + public void testRestHookSubscriptionApplicationXmlJson() throws Exception { + String payload = "application/fhir+xml"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + Subscription subscription1 = createSubscription(criteria1, payload); + Subscription subscription2 = createSubscription(criteria2, payload); + waitForActivatedSubscriptionCount(2); + + Observation observation1 = sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0)); + } + + @Test + public void testRestHookSubscriptionInvalidCriteria() throws Exception { + String payload = "application/xml"; + + String criteria1 = "Observation?codeeeee=SNOMED-CT"; + + try { + createSubscription(criteria1, payload); + fail(); + } catch (UnprocessableEntityException e) { + assertEquals("HTTP 422 Unprocessable Entity: Invalid subscription criteria submitted: Observation?codeeeee=SNOMED-CT Failed to parse match URL[Observation?codeeeee=SNOMED-CT] - Resource type Observation does not have a parameter with name: codeeeee", e.getMessage()); + } + } + + @Test + public void testSubscriptionWithHeaders() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + + // Add some headers, and we'll also turn back to requested status for fun + Subscription subscription = createSubscription(criteria1, payload); + waitForActivatedSubscriptionCount(1); + + subscription.getChannel().addHeader("X-Foo: FOO"); + subscription.getChannel().addHeader("X-Bar: BAR"); + subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED); + ourClient.update().resource(subscription).execute(); + waitForQueueToDrain(); + + sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); + assertThat(ourHeaders, hasItem("X-Foo: FOO")); + assertThat(ourHeaders, hasItem("X-Bar: BAR")); + } + + @Test + public void testDisableSubscription() throws Exception { + String payload = "application/fhir+json"; + + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + + Subscription subscription = createSubscription(criteria1, payload); + waitForActivatedSubscriptionCount(1); + + sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + + // Disable + subscription.setStatus(Subscription.SubscriptionStatus.OFF); + ourClient.update().resource(subscription).execute(); + waitForQueueToDrain(); + + // Send another object + sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification + waitForQueueToDrain(); + waitForSize(0, ourCreatedObservations); + waitForSize(1, ourUpdatedObservations); + + } + + @Test(expected = UnprocessableEntityException.class) + public void testInvalidProvenanceParam() { + String payload = "application/fhir+json"; + String criteriabad = "Provenance?activity=http://hl7.org/fhir/v3/DocumentCompletion%7CAU"; + Subscription subscription = newSubscription(criteriabad, payload); + ourClient.create().resource(subscription).execute(); + } + + @Test(expected = UnprocessableEntityException.class) + public void testInvalidProcedureRequestParam() { + String payload = "application/fhir+json"; + String criteriabad = "ProcedureRequest?intent=instance-order&category=Laboratory"; + Subscription subscription = newSubscription(criteriabad, payload); + ourClient.create().resource(subscription).execute(); + } + + @Test(expected = UnprocessableEntityException.class) + public void testInvalidBodySiteParam() { + String payload = "application/fhir+json"; + String criteriabad = "BodySite?accessType=Catheter"; + Subscription subscription = newSubscription(criteriabad, payload); + ourClient.create().resource(subscription).execute(); + } + + @Test + public void testGoodSubscriptionPersists() { + assertEquals(0, subscriptionCount()); + String payload = "application/fhir+json"; + String criteriaGood = "Patient?gender=male"; + Subscription subscription = newSubscription(criteriaGood, payload); + ourClient.create().resource(subscription).execute(); + assertEquals(1, subscriptionCount()); + } + + private int subscriptionCount() { + IBaseBundle found = ourClient.search().forResource(Subscription.class).cacheControl(new CacheControlDirective().setNoCache(true)).execute(); + return toUnqualifiedVersionlessIdValues(found).size(); + } + + @Test + public void testSubscriptionWithNoStatusIsRejected() { + Subscription subscription = newSubscription("Observation?", "application/json"); + subscription.setStatus(null); + + try { + ourClient.create().resource(subscription).execute(); + fail(); + } catch (UnprocessableEntityException e) { + assertThat(e.getMessage(), containsString("Can not process submitted Subscription - Subscription.status must be populated on this server")); + } + } + + + @Test + public void testBadSubscriptionDoesntPersist() { + assertEquals(0, subscriptionCount()); + String payload = "application/fhir+json"; + String criteriaBad = "BodySite?accessType=Catheter"; + Subscription subscription = newSubscription(criteriaBad, payload); + try { + ourClient.create().resource(subscription).execute(); + } catch (UnprocessableEntityException e) { + ourLog.info("Expected exception", e); + } + assertEquals(0, subscriptionCount()); + } + + @Test + public void testCustomSearchParam() throws Exception { + String criteria = "Observation?accessType=Catheter,PD%20Catheter"; + + SearchParameter sp = new SearchParameter(); + sp.addBase("Observation"); + sp.setCode("accessType"); + sp.setType(Enumerations.SearchParamType.TOKEN); + sp.setExpression("Observation.extension('Observation#accessType')"); + sp.setXpathUsage(SearchParameter.XPathUsageType.NORMAL); + sp.setStatus(Enumerations.PublicationStatus.ACTIVE); + mySearchParameterDao.create(sp); + mySearchParamRegistry.forceRefresh(); + createSubscription(criteria, "application/json"); + waitForActivatedSubscriptionCount(1); + + { + Observation bodySite = new Observation(); + bodySite.addExtension().setUrl("Observation#accessType").setValue(new Coding().setCode("Catheter")); + MethodOutcome methodOutcome = ourClient.create().resource(bodySite).execute(); + assertEquals(true, methodOutcome.getCreated()); + waitForQueueToDrain(); + waitForSize(1, ourUpdatedObservations); + } + { + Observation observation = new Observation(); + observation.addExtension().setUrl("Observation#accessType").setValue(new Coding().setCode("PD Catheter")); + MethodOutcome methodOutcome = ourClient.create().resource(observation).execute(); + assertEquals(true, methodOutcome.getCreated()); + waitForQueueToDrain(); + waitForSize(2, ourUpdatedObservations); + } + { + Observation observation = new Observation(); + MethodOutcome methodOutcome = ourClient.create().resource(observation).execute(); + assertEquals(true, methodOutcome.getCreated()); + waitForQueueToDrain(); + waitForSize(2, ourUpdatedObservations); + } + { + Observation observation = new Observation(); + observation.addExtension().setUrl("Observation#accessType").setValue(new Coding().setCode("XXX")); + MethodOutcome methodOutcome = ourClient.create().resource(observation).execute(); + assertEquals(true, methodOutcome.getCreated()); + waitForQueueToDrain(); + waitForSize(2, ourUpdatedObservations); + } + + } + + +} diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java index 1766c99c55a..319365e8cab 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionCanonicalizer.java @@ -34,6 +34,8 @@ import org.hl7.fhir.instance.model.api.IBaseReference; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.r4.model.Extension; +import org.hl7.fhir.r4.model.codesystems.SubscriptionStatus; +import org.hl7.fhir.r5.model.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -67,6 +69,8 @@ public class SubscriptionCanonicalizer { return canonicalizeDstu3(theSubscription); case R4: return canonicalizeR4(theSubscription); + case R5: + return canonicalizeR5(theSubscription); case DSTU2_HL7ORG: case DSTU2_1: default: @@ -169,6 +173,14 @@ public class SubscriptionCanonicalizer { .stream() .collect(Collectors.groupingBy(t -> t.getUrl(), mapping(t -> t.getValueAsPrimitive().getValueAsString(), toList()))); } + case R5: { + org.hl7.fhir.r5.model.Subscription subscription = (org.hl7.fhir.r5.model.Subscription) theSubscription; + return subscription + .getChannel() + .getExtension() + .stream() + .collect(Collectors.groupingBy(t -> t.getUrl(), mapping(t -> t.getValueAsPrimitive().getValueAsString(), toList()))); + } case DSTU2_HL7ORG: case DSTU2_1: default: { @@ -232,6 +244,56 @@ public class SubscriptionCanonicalizer { return retVal; } + private CanonicalSubscription canonicalizeR5(IBaseResource theSubscription) { + org.hl7.fhir.r5.model.Subscription subscription = (org.hl7.fhir.r5.model.Subscription) theSubscription; + + CanonicalSubscription retVal = new CanonicalSubscription(); + retVal.setStatus(org.hl7.fhir.r4.model.Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode())); + retVal.setChannelType(CanonicalSubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode())); + retVal.setCriteriaString(subscription.getCriteria()); + retVal.setEndpointUrl(subscription.getChannel().getEndpoint()); + retVal.setHeaders(subscription.getChannel().getHeader()); + retVal.setChannelExtensions(extractExtension(subscription)); + retVal.setIdElement(subscription.getIdElement()); + retVal.setPayloadString(subscription.getChannel().getPayload()); + + if (retVal.getChannelType() == CanonicalSubscriptionChannelType.EMAIL) { + String from; + String subjectTemplate; + try { + from = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM); + subjectTemplate = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE); + } catch (FHIRException theE) { + throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE); + } + retVal.getEmailDetails().setFrom(from); + retVal.getEmailDetails().setSubjectTemplate(subjectTemplate); + } + + if (retVal.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { + String stripVersionIds; + String deliverLatestVersion; + try { + stripVersionIds = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS); + deliverLatestVersion = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION); + } catch (FHIRException theE) { + throw new ConfigurationException("Failed to extract subscription extension(s): " + theE.getMessage(), theE); + } + retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds)); + retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion)); + } + + List topicExts = subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics"); + if (topicExts.size() > 0) { + IBaseReference ref = (IBaseReference) topicExts.get(0).getValueAsPrimitive(); + if (!"EventDefinition".equals(ref.getReferenceElement().getResourceType())) { + throw new PreconditionFailedException("Topic reference must be an EventDefinition"); + } + } + + return retVal; + } + public String getCriteria(IBaseResource theSubscription) { switch (myFhirContext.getVersion().getVersion()) { case DSTU2: @@ -240,6 +302,8 @@ public class SubscriptionCanonicalizer { return ((org.hl7.fhir.dstu3.model.Subscription) theSubscription).getCriteria(); case R4: return ((org.hl7.fhir.r4.model.Subscription) theSubscription).getCriteria(); + case R5: + return ((org.hl7.fhir.r5.model.Subscription) theSubscription).getCriteria(); case DSTU2_1: case DSTU2_HL7ORG: default: