Subscription without Payload did not contain header.
This commit is contained in:
parent
584179ba39
commit
432ad8e5bc
|
@ -0,0 +1,39 @@
|
|||
package ca.uhn.fhir.jpa.subscription;
|
||||
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Receives subscription notification without payloads.
|
||||
*/
|
||||
public class NotificationServlet extends HttpServlet {
|
||||
private static final long serialVersionUID = 5957950857980374719L;
|
||||
|
||||
private final AtomicLong receivedNotificationCount = new AtomicLong();
|
||||
|
||||
private final List<String> receivedAuthorizationHeaders = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
@Override
|
||||
protected void doPost(HttpServletRequest req, HttpServletResponse resp) {
|
||||
receivedNotificationCount.incrementAndGet();
|
||||
receivedAuthorizationHeaders.add(req.getHeader("Authorization"));
|
||||
}
|
||||
|
||||
public long getReceivedNotificationCount() {
|
||||
return receivedNotificationCount.get();
|
||||
}
|
||||
|
||||
public List<String> getReceivedAuthorizationHeaders() {
|
||||
return receivedAuthorizationHeaders;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
receivedNotificationCount.set(0);
|
||||
receivedAuthorizationHeaders.clear();
|
||||
}
|
||||
}
|
|
@ -49,6 +49,11 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
|
||||
@Autowired
|
||||
private SubscriptionTestUtil mySubscriptionTestUtil;
|
||||
private static NotificationServlet ourNotificationServlet;
|
||||
private static String ourNotificationListenerServer;
|
||||
private static List<Observation> ourUpdatedObservations = Lists.newArrayList();
|
||||
private static List<String> ourContentTypes = new ArrayList<>();
|
||||
private List<IIdType> mySubscriptionIds = new ArrayList<>();
|
||||
|
||||
@After
|
||||
public void afterUnregisterRestHookListener() {
|
||||
|
@ -79,9 +84,15 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
ourCreatedObservations.clear();
|
||||
ourUpdatedObservations.clear();
|
||||
ourContentTypes.clear();
|
||||
ourNotificationServlet.reset();
|
||||
}
|
||||
|
||||
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
|
||||
private Subscription createSubscription(String criteria, String payload, String endpoint) throws InterruptedException {
|
||||
return createSubscription(criteria, payload, endpoint, null);
|
||||
}
|
||||
|
||||
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint,
|
||||
List<StringType> headers) throws InterruptedException {
|
||||
Subscription subscription = new Subscription();
|
||||
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
|
||||
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
|
||||
|
@ -91,6 +102,9 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
|
||||
channel.setPayload(thePayload);
|
||||
channel.setEndpoint(theEndpoint);
|
||||
if (headers != null) {
|
||||
channel.setHeader(headers);
|
||||
}
|
||||
subscription.setChannel(channel);
|
||||
|
||||
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
|
||||
|
@ -120,6 +134,55 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
return observation;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestHookSubscription() throws Exception {
|
||||
String code = "1000000050";
|
||||
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
|
||||
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111&_format=xml";
|
||||
|
||||
createSubscription(criteria1, null, ourNotificationListenerServer,
|
||||
Collections.singletonList(new StringType("Authorization: abc-def")));
|
||||
createSubscription(criteria2, null, ourNotificationListenerServer);
|
||||
|
||||
sendObservation(code, "SNOMED-CT");
|
||||
|
||||
// Should see 1 subscription notification with authorization header
|
||||
waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders());
|
||||
Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount());
|
||||
Assert.assertEquals("abc-def", ourNotificationServlet.getReceivedAuthorizationHeaders().get(0));
|
||||
ourNotificationServlet.reset();
|
||||
|
||||
sendObservation(code, "SNOMED-CT");
|
||||
|
||||
// Should see 1 subscription notification with authorization header
|
||||
waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders());
|
||||
Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount());
|
||||
Assert.assertEquals("abc-def", ourNotificationServlet.getReceivedAuthorizationHeaders().get(0));
|
||||
ourNotificationServlet.reset();
|
||||
|
||||
Observation observationTemp3 = sendObservation(code, "SNOMED-CT");
|
||||
|
||||
/// Should see 1 subscription notification with authorization header
|
||||
waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders());
|
||||
Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount());
|
||||
Assert.assertEquals("abc-def", ourNotificationServlet.getReceivedAuthorizationHeaders().get(0));
|
||||
ourNotificationServlet.reset();
|
||||
|
||||
Observation observation3 = ourClient.read(Observation.class, observationTemp3.getId());
|
||||
CodeableConcept codeableConcept = new CodeableConcept();
|
||||
observation3.setCode(codeableConcept);
|
||||
Coding coding = codeableConcept.addCoding();
|
||||
coding.setCode(code + "111");
|
||||
coding.setSystem("SNOMED-CT");
|
||||
ourClient.update().resource(observation3).withId(observation3.getIdElement()).execute();
|
||||
|
||||
// Should see 2 subscription notifications with and without authorization header
|
||||
waitForSize(1, ourNotificationServlet.getReceivedAuthorizationHeaders());
|
||||
Assert.assertEquals(1, ourNotificationServlet.getReceivedNotificationCount());
|
||||
Assert.assertNull(ourNotificationServlet.getReceivedAuthorizationHeaders().get(0));
|
||||
ourNotificationServlet.reset();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestHookSubscriptionApplicationFhirJson() throws Exception {
|
||||
String payload = "application/fhir+json";
|
||||
|
@ -358,11 +421,13 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
ourListenerPort = PortUtil.findFreePort();
|
||||
ourListenerRestServer = new RestfulServer(FhirContext.forDstu3());
|
||||
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
|
||||
ourNotificationListenerServer = "http://localhost:" + ourListenerPort + "/fhir/subscription";
|
||||
|
||||
ObservationListener obsListener = new ObservationListener();
|
||||
ourListenerRestServer.setResourceProviders(obsListener);
|
||||
|
||||
ourListenerServer = new Server(ourListenerPort);
|
||||
ourNotificationServlet = new NotificationServlet();
|
||||
|
||||
ServletContextHandler proxyHandler = new ServletContextHandler();
|
||||
proxyHandler.setContextPath("/");
|
||||
|
@ -370,6 +435,9 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
|
|||
ServletHolder servletHolder = new ServletHolder();
|
||||
servletHolder.setServlet(ourListenerRestServer);
|
||||
proxyHandler.addServlet(servletHolder, "/fhir/context/*");
|
||||
servletHolder = new ServletHolder();
|
||||
servletHolder.setServlet(ourNotificationServlet);
|
||||
proxyHandler.addServlet(servletHolder, "/fhir/subscription");
|
||||
|
||||
ourListenerServer.setHandler(proxyHandler);
|
||||
ourListenerServer.start();
|
||||
|
|
|
@ -21,6 +21,12 @@ package ca.uhn.fhir.jpa.subscription.module.subscriber;
|
|||
*/
|
||||
|
||||
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionDeliverySubscriber;
|
||||
import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor;
|
||||
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
|
||||
import ca.uhn.fhir.jpa.subscription.ResourceDeliveryMessage;
|
||||
import ca.uhn.fhir.rest.api.EncodingEnum;
|
||||
import ca.uhn.fhir.rest.api.RequestTypeEnum;
|
||||
import ca.uhn.fhir.rest.client.api.*;
|
||||
|
@ -28,6 +34,7 @@ import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor;
|
|||
import ca.uhn.fhir.rest.gclient.IClientExecutable;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -183,12 +190,28 @@ public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDe
|
|||
*/
|
||||
protected void sendNotification(ResourceDeliveryMessage theMsg) {
|
||||
Map<String, List<String>> params = new HashMap();
|
||||
|
||||
List<Header> headers = new ArrayList<>();
|
||||
if (theMsg.getSubscription().getHeaders() != null) {
|
||||
theMsg.getSubscription().getHeaders().stream().filter(Objects::nonNull).forEach(h -> {
|
||||
final int sep = h.indexOf(':');
|
||||
if (sep > 0) {
|
||||
final String name = h.substring(0, sep);
|
||||
final String value = h.substring(sep + 1);
|
||||
if (StringUtils.isNotBlank(name)) {
|
||||
headers.add(new Header(name.trim(), value.trim()));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
StringBuilder url = new StringBuilder(theMsg.getSubscription().getEndpointUrl());
|
||||
IHttpClient client = myFhirContext.getRestfulClientFactory().getHttpClient(url, params, "", RequestTypeEnum.POST, headers);
|
||||
IHttpRequest request = client.createParamRequest(myFhirContext, params, null);
|
||||
try {
|
||||
IHttpResponse response = request.execute();
|
||||
// close connection in order to return a possible cached connection to the connection pool
|
||||
response.close();
|
||||
} catch (IOException e) {
|
||||
ourLog.error("Error trying to reach "+ theMsg.getSubscription().getEndpointUrl());
|
||||
e.printStackTrace();
|
||||
|
|
Loading…
Reference in New Issue