Fix #1084 - Apply headers correctly during merge. Merge branch 'volsch-no_payload_subscription_fix'

This commit is contained in:
jamesagnew 2019-01-18 05:48:03 -05:00
commit 4035b28161
3 changed files with 124 additions and 5 deletions

View File

@ -0,0 +1,39 @@
package ca.uhn.fhir.jpa.subscription;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* Receives subscription notification without payloads.
*/
public class NotificationServlet extends HttpServlet {
private static final long serialVersionUID = 5957950857980374719L;
private final AtomicLong receivedNotificationCount = new AtomicLong();
private final List<String> receivedAuthorizationHeaders = Collections.synchronizedList(new ArrayList<>());
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) {
receivedNotificationCount.incrementAndGet();
receivedAuthorizationHeaders.add(req.getHeader("Authorization"));
}
public long getReceivedNotificationCount() {
return receivedNotificationCount.get();
}
public List<String> getReceivedAuthorizationHeaders() {
return receivedAuthorizationHeaders;
}
public void reset() {
receivedNotificationCount.set(0);
receivedAuthorizationHeaders.clear();
}
}

View File

@ -4,6 +4,7 @@ package ca.uhn.fhir.jpa.subscription.resthook;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
import ca.uhn.fhir.jpa.subscription.NotificationServlet;
import ca.uhn.fhir.jpa.subscription.SubscriptionTestUtil;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
@ -49,6 +50,8 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
@Autowired
private SubscriptionTestUtil mySubscriptionTestUtil;
private static NotificationServlet ourNotificationServlet;
private static String ourNotificationListenerServer;
@After
public void afterUnregisterRestHookListener() {
@ -79,9 +82,15 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
ourCreatedObservations.clear();
ourUpdatedObservations.clear();
ourContentTypes.clear();
ourNotificationServlet.reset();
}
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
private Subscription createSubscription(String criteria, String payload, String endpoint) throws InterruptedException {
return createSubscription(criteria, payload, endpoint, null);
}
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint,
List<StringType> headers) throws InterruptedException {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
@ -91,6 +100,9 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
channel.setPayload(thePayload);
channel.setEndpoint(theEndpoint);
if (headers != null) {
channel.setHeader(headers);
}
subscription.setChannel(channel);
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
@ -120,6 +132,55 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
return observation;
}
@Test
public void testRestHookSubscription() throws Exception {
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
createSubscription(criteria1, null, ourNotificationListenerServer,
Collections.singletonList(new StringType("Authorization: abc-def")));
createSubscription(criteria2, null, ourNotificationListenerServer);
sendObservation(code, "SNOMED-CT");
// Should see 1 subscription notification with authorization header
waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders());
Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount());
Assert.assertEquals("abc-def", ourNotificationServlet.getReceivedAuthorizationHeaders().get(0));
ourNotificationServlet.reset();
sendObservation(code, "SNOMED-CT");
// Should see 1 subscription notification with authorization header
waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders());
Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount());
Assert.assertEquals("abc-def", ourNotificationServlet.getReceivedAuthorizationHeaders().get(0));
ourNotificationServlet.reset();
Observation observationTemp3 = sendObservation(code, "SNOMED-CT");
/// Should see 1 subscription notification with authorization header
waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders());
Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount());
Assert.assertEquals("abc-def", ourNotificationServlet.getReceivedAuthorizationHeaders().get(0));
ourNotificationServlet.reset();
Observation observation3 = ourClient.read(Observation.class, observationTemp3.getId());
CodeableConcept codeableConcept = new CodeableConcept();
observation3.setCode(codeableConcept);
Coding coding = codeableConcept.addCoding();
coding.setCode(code + "111");
coding.setSystem("SNOMED-CT");
ourClient.update().resource(observation3).withId(observation3.getIdElement()).execute();
// Should see 2 subscription notifications with and without authorization header
waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders());
Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount());
Assert.assertNull(ourNotificationServlet.getReceivedAuthorizationHeaders().get(0));
ourNotificationServlet.reset();
}
@Test
public void testRestHookSubscriptionApplicationFhirJson() throws Exception {
String payload = "application/fhir+json";
@ -358,11 +419,13 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
ourListenerPort = PortUtil.findFreePort();
ourListenerRestServer = new RestfulServer(FhirContext.forDstu3());
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
ourNotificationListenerServer = "http://localhost:" + ourListenerPort + "/fhir/subscription";
ObservationListener obsListener = new ObservationListener();
ourListenerRestServer.setResourceProviders(obsListener);
ourListenerServer = new Server(ourListenerPort);
ourNotificationServlet = new NotificationServlet();
ServletContextHandler proxyHandler = new ServletContextHandler();
proxyHandler.setContextPath("/");
@ -370,6 +433,9 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
ServletHolder servletHolder = new ServletHolder();
servletHolder.setServlet(ourListenerRestServer);
proxyHandler.addServlet(servletHolder, "/fhir/context/*");
servletHolder = new ServletHolder();
servletHolder.setServlet(ourNotificationServlet);
proxyHandler.addServlet(servletHolder, "/fhir/subscription");
ourListenerServer.setHandler(proxyHandler);
ourListenerServer.start();

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor;
import ca.uhn.fhir.rest.gclient.IClientExecutable;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
@ -38,10 +39,7 @@ import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -183,12 +181,28 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
*/
protected void sendNotification(ResourceDeliveryMessage theMsg) {
Map<String, List<String>> params = new HashMap();
List<Header> headers = new ArrayList<>();
if (theMsg.getSubscription().getHeaders() != null) {
theMsg.getSubscription().getHeaders().stream().filter(Objects::nonNull).forEach(h -> {
final int sep = h.indexOf(':');
if (sep > 0) {
final String name = h.substring(0, sep);
final String value = h.substring(sep + 1);
if (StringUtils.isNotBlank(name)) {
headers.add(new Header(name.trim(), value.trim()));
}
}
});
}
StringBuilder url = new StringBuilder(theMsg.getSubscription().getEndpointUrl());
IHttpClient client = myFhirContext.getRestfulClientFactory().getHttpClient(url, params, "", RequestTypeEnum.POST, headers);
IHttpRequest request = client.createParamRequest(myFhirContext, params, null);
try {
IHttpResponse response = request.execute();
// close connection in order to return a possible cached connection to the connection pool
response.close();
} catch (IOException e) {
ourLog.error("Error trying to reach "+ theMsg.getSubscription().getEndpointUrl());
e.printStackTrace();