From 432ad8e5bcc90bf6bbc0835b0f526270e9b62c24 Mon Sep 17 00:00:00 2001 From: Volker Schmidt Date: Thu, 27 Sep 2018 13:46:51 +0200 Subject: [PATCH] Subscription without Payload did not contain header. --- .../jpa/subscription/NotificationServlet.java | 39 +++++++++++ .../resthook/RestHookTestDstu3Test.java | 70 ++++++++++++++++++- ...scriptionDeliveringRestHookSubscriber.java | 23 ++++++ 3 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/NotificationServlet.java diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/NotificationServlet.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/NotificationServlet.java new file mode 100644 index 00000000000..7abb32a3831 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/NotificationServlet.java @@ -0,0 +1,39 @@ +package ca.uhn.fhir.jpa.subscription; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Receives subscription notification without payloads. + */ +public class NotificationServlet extends HttpServlet { + private static final long serialVersionUID = 5957950857980374719L; + + private final AtomicLong receivedNotificationCount = new AtomicLong(); + + private final List receivedAuthorizationHeaders = Collections.synchronizedList(new ArrayList<>()); + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) { + receivedNotificationCount.incrementAndGet(); + receivedAuthorizationHeaders.add(req.getHeader("Authorization")); + } + + public long getReceivedNotificationCount() { + return receivedNotificationCount.get(); + } + + public List getReceivedAuthorizationHeaders() { + return receivedAuthorizationHeaders; + } + + public void reset() { + receivedNotificationCount.set(0); + receivedAuthorizationHeaders.clear(); + } +} diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestDstu3Test.java index 97716aac54f..a0d71ca92a5 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/resthook/RestHookTestDstu3Test.java @@ -49,6 +49,11 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test { @Autowired private SubscriptionTestUtil mySubscriptionTestUtil; + private static NotificationServlet ourNotificationServlet; + private static String ourNotificationListenerServer; + private static List ourUpdatedObservations = Lists.newArrayList(); + private static List ourContentTypes = new ArrayList<>(); + private List mySubscriptionIds = new ArrayList<>(); @After public void afterUnregisterRestHookListener() { @@ -79,9 +84,15 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test { ourCreatedObservations.clear(); ourUpdatedObservations.clear(); ourContentTypes.clear(); + ourNotificationServlet.reset(); } - private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException { + private Subscription createSubscription(String criteria, String payload, String endpoint) throws InterruptedException { + return createSubscription(criteria, payload, endpoint, null); + } + + private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint, + List headers) throws InterruptedException { Subscription subscription = new Subscription(); subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)"); subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED); @@ -91,6 +102,9 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test { channel.setType(Subscription.SubscriptionChannelType.RESTHOOK); channel.setPayload(thePayload); channel.setEndpoint(theEndpoint); + if (headers != null) { + channel.setHeader(headers); + } subscription.setChannel(channel); MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); @@ -120,6 +134,55 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test { return observation; } + @Test + public void testRestHookSubscription() throws Exception { + String code = "1000000050"; + String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml"; + String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml"; + + createSubscription(criteria1, null, ourNotificationListenerServer, + Collections.singletonList(new StringType("Authorization: abc-def"))); + createSubscription(criteria2, null, ourNotificationListenerServer); + + sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification with authorization header + waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders()); + Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount()); + Assert.assertEquals("abc-def", ourNotificationServlet.getReceivedAuthorizationHeaders().get(0)); + ourNotificationServlet.reset(); + + sendObservation(code, "SNOMED-CT"); + + // Should see 1 subscription notification with authorization header + waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders()); + Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount()); + Assert.assertEquals("abc-def", ourNotificationServlet.getReceivedAuthorizationHeaders().get(0)); + ourNotificationServlet.reset(); + + Observation observationTemp3 = sendObservation(code, "SNOMED-CT"); + + /// Should see 1 subscription notification with authorization header + waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders()); + Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount()); + Assert.assertEquals("abc-def", ourNotificationServlet.getReceivedAuthorizationHeaders().get(0)); + ourNotificationServlet.reset(); + + Observation observation3 = ourClient.read(Observation.class, observationTemp3.getId()); + CodeableConcept codeableConcept = new CodeableConcept(); + observation3.setCode(codeableConcept); + Coding coding = codeableConcept.addCoding(); + coding.setCode(code + "111"); + coding.setSystem("SNOMED-CT"); + ourClient.update().resource(observation3).withId(observation3.getIdElement()).execute(); + + // Should see 2 subscription notifications with and without authorization header + waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders()); + Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount()); + Assert.assertNull(ourNotificationServlet.getReceivedAuthorizationHeaders().get(0)); + ourNotificationServlet.reset(); + } + @Test public void testRestHookSubscriptionApplicationFhirJson() throws Exception { String payload = "application/fhir+json"; @@ -358,11 +421,13 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test { ourListenerPort = PortUtil.findFreePort(); ourListenerRestServer = new RestfulServer(FhirContext.forDstu3()); ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; + ourNotificationListenerServer = "http://localhost:" + ourListenerPort + "/fhir/subscription"; ObservationListener obsListener = new ObservationListener(); ourListenerRestServer.setResourceProviders(obsListener); ourListenerServer = new Server(ourListenerPort); + ourNotificationServlet = new NotificationServlet(); ServletContextHandler proxyHandler = new ServletContextHandler(); proxyHandler.setContextPath("/"); @@ -370,6 +435,9 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test { ServletHolder servletHolder = new ServletHolder(); servletHolder.setServlet(ourListenerRestServer); proxyHandler.addServlet(servletHolder, "/fhir/context/*"); + servletHolder = new ServletHolder(); + servletHolder.setServlet(ourNotificationServlet); + proxyHandler.addServlet(servletHolder, "/fhir/subscription"); ourListenerServer.setHandler(proxyHandler); ourListenerServer.start(); diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionDeliveringRestHookSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionDeliveringRestHookSubscriber.java index b2d9e803053..383a0ce7272 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionDeliveringRestHookSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/subscriber/SubscriptionDeliveringRestHookSubscriber.java @@ -21,6 +21,12 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber; */ import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.subscription.BaseSubscriptionDeliverySubscriber; +import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; +import ca.uhn.fhir.jpa.subscription.CanonicalSubscription; +import ca.uhn.fhir.jpa.subscription.ResourceDeliveryMessage; import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.api.RequestTypeEnum; import ca.uhn.fhir.rest.client.api.*; @@ -28,6 +34,7 @@ import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor; import ca.uhn.fhir.rest.gclient.IClientExecutable; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; +import org.apache.commons.lang3.StringUtils; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.slf4j.Logger; @@ -183,12 +190,28 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe */ protected void sendNotification(ResourceDeliveryMessage theMsg) { Map> params = new HashMap(); + List
headers = new ArrayList<>(); + if (theMsg.getSubscription().getHeaders() != null) { + theMsg.getSubscription().getHeaders().stream().filter(Objects::nonNull).forEach(h -> { + final int sep = h.indexOf(':'); + if (sep > 0) { + final String name = h.substring(0, sep); + final String value = h.substring(sep + 1); + if (StringUtils.isNotBlank(name)) { + headers.add(new Header(name.trim(), value.trim())); + } + } + }); + } + StringBuilder url = new StringBuilder(theMsg.getSubscription().getEndpointUrl()); IHttpClient client = myFhirContext.getRestfulClientFactory().getHttpClient(url, params, "", RequestTypeEnum.POST, headers); IHttpRequest request = client.createParamRequest(myFhirContext, params, null); try { IHttpResponse response = request.execute(); + // close connection in order to return a possible cached connection to the connection pool + response.close(); } catch (IOException e) { ourLog.error("Error trying to reach "+ theMsg.getSubscription().getEndpointUrl()); e.printStackTrace();