Add support for deletes to stu3/dstu2 subscriptions. (#3836)

* Add support for DSTU3 delete subscrion events, add test

* Add changelogP
This commit is contained in:
Tadgh 2022-07-22 15:24:59 -04:00 committed by GitHub
parent cbcf3f1942
commit 6d9bbbec1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 7 deletions

View File

@ -0,0 +1,5 @@
---
type: add
issue: 3243
title: "Previously, the `http://hapifhir.io/fhir/StructureDefinition/subscription-send-delete-messages` extension on REST-HOOK subscription channel element was only valid for R4. This has been expanded to support DSTU3 and DSTU2."

View File

@ -1,8 +1,10 @@
package ca.uhn.fhir.jpa.subscription;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -18,6 +20,13 @@ public class NotificationServlet extends HttpServlet {
private final List<String> receivedAuthorizationHeaders = Collections.synchronizedList(new ArrayList<>());
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
receivedNotificationCount.incrementAndGet();
receivedAuthorizationHeaders.add(req.getHeader("Authorization"));
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) {
receivedNotificationCount.incrementAndGet();

View File

@ -9,7 +9,10 @@ import ca.uhn.fhir.jpa.subscription.NotificationServlet;
import ca.uhn.fhir.jpa.test.util.SubscriptionTestUtil;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.util.SubscriptionDebugLogInterceptor;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.Delete;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update;
import ca.uhn.fhir.rest.api.Constants;
@ -24,10 +27,12 @@ import com.google.common.collect.Lists;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.dstu3.model.BooleanType;
import org.hl7.fhir.dstu3.model.CodeableConcept;
import org.hl7.fhir.dstu3.model.Coding;
import org.hl7.fhir.dstu3.model.CommunicationRequest;
import org.hl7.fhir.dstu3.model.DateTimeType;
import org.hl7.fhir.dstu3.model.Extension;
import org.hl7.fhir.dstu3.model.IdType;
import org.hl7.fhir.dstu3.model.Observation;
import org.hl7.fhir.dstu3.model.Organization;
@ -40,6 +45,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
@ -51,6 +58,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@ -74,6 +82,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
private static Server ourListenerServer;
private static String ourListenerServerBase;
private static final List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList());
private static final List<IIdType> ourDeletedObservationIds = Collections.synchronizedList(Lists.newArrayList());
private static final List<String> ourContentTypes = Collections.synchronizedList(new ArrayList<>());
private static NotificationServlet ourNotificationServlet;
private static String ourNotificationListenerServer;
@ -122,12 +131,12 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
}
private Subscription createSubscription(String criteria, String payload, String endpoint) throws InterruptedException {
return createSubscription(criteria, payload, endpoint, null);
return createSubscription(criteria, payload, endpoint, null, null);
}
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint,
List<StringType> headers) throws InterruptedException {
Subscription subscription = newSubscription(theCriteria, thePayload, theEndpoint, headers);
List<StringType> headers, Extension theChannelExtension) throws InterruptedException {
Subscription subscription = newSubscription(theCriteria, thePayload, theEndpoint, headers, theChannelExtension);
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
mySubscriptionIds.add(methodOutcome.getId());
@ -138,7 +147,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
}
@Nonnull
private Subscription newSubscription(String theCriteria, String thePayload, String theEndpoint, List<StringType> headers) {
private Subscription newSubscription(String theCriteria, String thePayload, String theEndpoint, List<StringType> headers, Extension theChannelExtension) {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
@ -151,6 +160,9 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
if (headers != null) {
channel.setHeader(headers);
}
if (theChannelExtension != null ) {
channel.addExtension(theChannelExtension);
}
subscription.setChannel(channel);
return subscription;
}
@ -191,6 +203,27 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
assertEquals(HapiExtensions.EXT_SUBSCRIPTION_MATCHING_STRATEGY, tag.get(0).getSystem());
assertEquals(SubscriptionMatchingStrategy.IN_MEMORY.toString(), tag.get(0).getCode());
}
@ParameterizedTest
@ValueSource(strings = {"[*]", "[Observation]", "Observation?"})
public void RestHookSubscriptionWithPayloadSendsDeleteRequest(String theCriteria) throws Exception {
String payload = "application/json";
Extension sendDeleteMessagesExtension = new Extension()
.setUrl(EX_SEND_DELETE_MESSAGES)
.setValue(new BooleanType(true));
waitForActivatedSubscriptionCount(0);
createSubscription(theCriteria, payload, ourNotificationListenerServer, null, sendDeleteMessagesExtension);
waitForActivatedSubscriptionCount(1);
Observation observation = sendObservation("OB-01", "SNOMED-CT");
ourNotificationServlet.reset();
ourLog.info("** About to delete observation");
myObservationDao.delete(IdDt.of(observation).toUnqualifiedVersionless());
await().until(() -> ourNotificationServlet.getReceivedNotificationCount() == 1);
}
@Test
public void testRestHookSubscription() throws Exception {
@ -199,7 +232,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
String criteria2 = "Observation?code=SNOMED-CT|" + code + "111";
createSubscription(criteria1, null, ourNotificationListenerServer,
Collections.singletonList(new StringType("Authorization: abc-def")));
Collections.singletonList(new StringType("Authorization: abc-def")), null);
createSubscription(criteria2, null, ourNotificationListenerServer);
ourLog.debug("Sending first observation");
@ -271,7 +304,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
String source = "foosource";
String criteria = "Observation?_source=" + source;
Subscription subscription = newSubscription(criteria, payload, ourListenerServerBase, null);
Subscription subscription = newSubscription(criteria, payload, ourListenerServerBase, null, null);
MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute();
Subscription savedSub = (Subscription) methodOutcome.getResource();
assertInMemoryTag(savedSub);
@ -587,7 +620,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
@Test
public void testSubscriptionWithNoStatusIsRejected() {
Subscription subscription = newSubscription("Observation?", "application/json", null, null);
Subscription subscription = newSubscription("Observation?", "application/json", null, null, null);
subscription.setStatus(null);
try {
@ -621,6 +654,13 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedObservations.size());
return new MethodOutcome(new IdType("Observation/1"), false);
}
@Delete
public MethodOutcome delete(@IdParam IIdType theIIdType, HttpServletRequest theRequest) {
ourDeletedObservationIds.add(theIIdType);
ourLog.info("Received Listener Delete(now have {} deletes)", ourDeletedObservationIds.size());
return new MethodOutcome(new IdType("Observation/1"), false);
}
}
public static class CommunicationRequestListener implements IResourceProvider {
@ -675,6 +715,7 @@ public class RestHookTestDstu3Test extends BaseResourceProviderDstu3Test {
servletHolder = new ServletHolder();
servletHolder.setServlet(ourNotificationServlet);
proxyHandler.addServlet(servletHolder, "/fhir/subscription");
proxyHandler.addServlet(servletHolder, "/fhir/subscription/*");
ourListenerServer.setHandler(proxyHandler);
JettyUtil.startServer(ourListenerServer);

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import ca.uhn.fhir.model.api.ExtensionDt;
import ca.uhn.fhir.model.dstu2.resource.Subscription;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
@ -42,6 +43,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Extension;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -51,6 +53,7 @@ import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static ca.uhn.fhir.util.HapiExtensions.EX_SEND_DELETE_MESSAGES;
@ -98,12 +101,23 @@ public class SubscriptionCanonicalizer {
retVal.setPayloadString(subscription.getChannel().getPayload());
retVal.setTags(extractTags(subscription));
retVal.setCrossPartitionEnabled(SubscriptionUtil.isCrossPartition(theSubscription));
retVal.setSendDeleteMessages(extractDeleteExtensionDstu2(subscription));
} catch (FHIRException theE) {
throw new InternalErrorException(Msg.code(557) + theE);
}
return retVal;
}
private boolean extractDeleteExtensionDstu2(ca.uhn.fhir.model.dstu2.resource.Subscription theSubscription) {
return theSubscription.getChannel().getUndeclaredExtensionsByUrl(EX_SEND_DELETE_MESSAGES)
.stream()
.map(ExtensionDt::getValue)
.map(value -> (org.hl7.fhir.dstu2.model.BooleanType) value)
.map(org.hl7.fhir.dstu2.model.BooleanType::booleanValue)
.findFirst()
.orElse(false);
}
/**
* Extract the meta tags from the subscription and convert them to a simple string map.
* @param theSubscription The subscription to extract the tags from
@ -167,6 +181,7 @@ public class SubscriptionCanonicalizer {
retVal.getRestHookDetails().setStripVersionId(Boolean.parseBoolean(stripVersionIds));
retVal.getRestHookDetails().setDeliverLatestVersion(Boolean.parseBoolean(deliverLatestVersion));
}
retVal.setSendDeleteMessages(extractSendDeletesDstu3(subscription));
} catch (FHIRException theE) {
throw new InternalErrorException(Msg.code(560) + theE);
@ -174,6 +189,17 @@ public class SubscriptionCanonicalizer {
return retVal;
}
@NotNull
private Boolean extractSendDeletesDstu3(org.hl7.fhir.dstu3.model.Subscription subscription) {
return subscription.getChannel().getExtensionsByUrl(EX_SEND_DELETE_MESSAGES).stream()
.map(org.hl7.fhir.dstu3.model.Extension::getValue)
.filter(val -> val instanceof org.hl7.fhir.dstu3.model.BooleanType)
.map(val -> (org.hl7.fhir.dstu3.model.BooleanType) val)
.map(org.hl7.fhir.dstu3.model.BooleanType::booleanValue)
.findFirst()
.orElse(false);
}
private @Nonnull
Map<String, List<String>> extractExtension(IBaseResource theSubscription) {
try {