Allow multi-resource Subscription criteria (#3110)

* Allow start criteria subscriptions

* Rework subscription tests

* Subscription cleanup

* Add changelog

* Test fixes

* Test fix
This commit is contained in:
James Agnew 2021-10-26 16:48:13 -04:00 committed by GitHub
parent 2cf8e74414
commit 9e057574a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 806 additions and 325 deletions

View File

@ -276,6 +276,10 @@ public class Constants {
public static final String RESOURCE_PARTITION_ID = Constants.class.getName() + "_RESOURCE_PARTITION_ID";
public static final String CT_APPLICATION_GZIP = "application/gzip";
public static final String[] EMPTY_STRING_ARRAY = new String[0];
public static final String SUBSCRIPTION_MULTITYPE_PREFIX = "[";
public static final String SUBSCRIPTION_MULTITYPE_SUFFIX = "]";
public static final String SUBSCRIPTION_MULTITYPE_STAR = "*";
public static final String SUBSCRIPTION_STAR_CRITERIA = SUBSCRIPTION_MULTITYPE_PREFIX + SUBSCRIPTION_MULTITYPE_STAR + SUBSCRIPTION_MULTITYPE_SUFFIX;
static {
CHARSET_UTF8 = StandardCharsets.UTF_8;

View File

@ -1402,6 +1402,28 @@ public class FhirTerser {
}
/**
* Clones a resource object, copying all data elements from theSource into a new copy of the same type.
*
* Note that:
* <ul>
* <li>Only FHIR data elements are copied (i.e. user data maps are not copied)</li>
* <li>If a class extending a HAPI FHIR type (e.g. an instance of a class extending the Patient class) is supplied, an instance of the base type will be returned.</li>
* </ul>
*
*
* @param theSource The source resource
* @return A copy of the source resource
* @since 5.6.0
*/
@SuppressWarnings("unchecked")
public <T extends IBaseResource> T clone(T theSource) {
Validate.notNull(theSource, "theSource must not be null");
T target = (T) myContext.getResourceDefinition(theSource).newInstance();
cloneInto(theSource, target, false);
return target;
}
public enum OptionsEnum {

View File

@ -0,0 +1,6 @@
---
type: add
issue: 3110
title: "Subscription criteria in the HAPI FHIR JPA server now supports an optional alternate syntax of
`[*]` (all resources of all types) and `[resourcename,resourcename,...]` (all resources of the
given types. Note that no search parameters may be specitied with this syntax."

View File

@ -53,7 +53,7 @@ public class FhirResourceDaoR4InvalidSubscriptionTest extends BaseJpaR4Test {
mySubscriptionDao.update(s);
fail();
} catch (UnprocessableEntityException e) {
assertEquals("Subscription.criteria must be in the form \"{Resource Type}?[params]\"", e.getMessage());
assertEquals("Subscription.criteria contains invalid/unsupported resource type: FOO", e.getMessage());
}
}

View File

@ -1,62 +1,58 @@
package ca.uhn.fhir.jpa.subscription;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
import ca.uhn.fhir.jpa.subscription.submit.interceptor.SubscriptionMatcherInterceptor;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Transaction;
import ca.uhn.fhir.rest.annotation.TransactionParam;
import ca.uhn.fhir.rest.annotation.Update;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.test.utilities.JettyUtil;
import ca.uhn.fhir.test.utilities.server.HashMapResourceProviderExtension;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import ca.uhn.fhir.test.utilities.server.TransactionCapturingProviderExtension;
import ca.uhn.fhir.util.BundleUtil;
import com.google.common.collect.Lists;
import net.ttddyy.dsproxy.QueryCount;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestParam;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseSubscriptionsR4Test.class);
protected static int ourListenerPort;
protected static List<String> ourContentTypes = Collections.synchronizedList(new ArrayList<>());
protected static List<String> ourHeaders = Collections.synchronizedList(new ArrayList<>());
protected static List<Bundle> ourTransactions = Collections.synchronizedList(Lists.newArrayList());
protected static List<Observation> ourCreatedObservations = Collections.synchronizedList(Lists.newArrayList());
protected static List<Observation> ourUpdatedObservations = Collections.synchronizedList(Lists.newArrayList());
private static Server ourListenerServer;
private static SingleQueryCountHolder ourCountHolder;
private static String ourListenerServerBase;
@Order(0)
@RegisterExtension
protected static RestfulServerExtension ourRestfulServer = new RestfulServerExtension(FhirContext.forR4Cached());
@Order(1)
@RegisterExtension
protected static HashMapResourceProviderExtension<Patient> ourPatientProvider = new HashMapResourceProviderExtension<>(ourRestfulServer, Patient.class);
@Order(1)
@RegisterExtension
protected static HashMapResourceProviderExtension<Observation> ourObservationProvider = new HashMapResourceProviderExtension<>(ourRestfulServer, Observation.class);
@Order(1)
@RegisterExtension
protected static TransactionCapturingProviderExtension<Bundle> ourTransactionProvider = new TransactionCapturingProviderExtension<>(ourRestfulServer, Bundle.class);
protected static SingleQueryCountHolder ourCountHolder;
@Order(1)
@RegisterExtension
protected static HashMapResourceProviderExtension<Organization> ourOrganizationProvider = new HashMapResourceProviderExtension<>(ourRestfulServer, Organization.class);
@Autowired
protected SubscriptionTestUtil mySubscriptionTestUtil;
@Autowired
@ -92,12 +88,6 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
@BeforeEach
public void beforeReset() throws Exception {
ourCreatedObservations.clear();
ourUpdatedObservations.clear();
ourTransactions.clear();
ourContentTypes.clear();
ourHeaders.clear();
// Delete all Subscriptions
if (myClient != null) {
Bundle allSubscriptions = myClient.search().forResource(Subscription.class).returnBundle(Bundle.class).execute();
@ -137,7 +127,7 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
Subscription.SubscriptionChannelComponent channel = subscription.getChannel();
channel.setType(Subscription.SubscriptionChannelType.RESTHOOK);
channel.setPayload(thePayload);
channel.setEndpoint(ourListenerServerBase);
channel.setEndpoint(ourRestfulServer.getBaseUrl());
return subscription;
}
@ -169,55 +159,24 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
return observation;
}
protected Patient sendPatient() {
Patient patient = new Patient();
patient.setActive(true);
public static class ObservationResourceProvider implements IResourceProvider {
@Create
public MethodOutcome create(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
ourLog.info("Received Listener Create");
ourContentTypes.add(theRequest.getHeader(ca.uhn.fhir.rest.api.Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
ourCreatedObservations.add(theObservation);
extractHeaders(theRequest);
return new MethodOutcome(new IdType("Observation/1"), true);
}
private void extractHeaders(HttpServletRequest theRequest) {
java.util.Enumeration<String> headerNamesEnum = theRequest.getHeaderNames();
while (headerNamesEnum.hasMoreElements()) {
String nextName = headerNamesEnum.nextElement();
Enumeration<String> valueEnum = theRequest.getHeaders(nextName);
while (valueEnum.hasMoreElements()) {
String nextValue = valueEnum.nextElement();
ourHeaders.add(nextName + ": " + nextValue);
}
}
}
@Override
public Class<? extends IBaseResource> getResourceType() {
return Observation.class;
}
@Update
public MethodOutcome update(@ResourceParam Observation theObservation, HttpServletRequest theRequest) {
ourLog.info("Received Listener Update");
ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", ""));
ourUpdatedObservations.add(theObservation);
extractHeaders(theRequest);
return new MethodOutcome(new IdType("Observation/1"), false);
}
IIdType id = myPatientDao.create(patient).getId();
patient.setId(id);
return patient;
}
public static class PlainProvider {
protected Organization sendOrganization() {
Organization org = new Organization();
org.setName("ORG");
@Transaction
public Bundle transaction(@TransactionParam Bundle theInput) {
ourLog.info("Received transaction update");
ourTransactions.add(theInput);
return theInput;
}
IIdType id = myOrganizationDao.create(org).getId();
org.setId(id);
return org;
}
@AfterAll
@ -229,31 +188,4 @@ public abstract class BaseSubscriptionsR4Test extends BaseResourceProviderR4Test
return ourCountHolder.getQueryCountMap().get("");
}
@BeforeAll
public static void startListenerServer() throws Exception {
RestfulServer ourListenerRestServer = new RestfulServer(FhirContext.forR4());
ourListenerRestServer.registerProvider(new ObservationResourceProvider());
ourListenerRestServer.registerProvider(new PlainProvider());
ourListenerServer = new Server(0);
ServletContextHandler proxyHandler = new ServletContextHandler();
proxyHandler.setContextPath("/");
ServletHolder servletHolder = new ServletHolder();
servletHolder.setServlet(ourListenerRestServer);
proxyHandler.addServlet(servletHolder, "/fhir/context/*");
ourListenerServer.setHandler(proxyHandler);
JettyUtil.startServer(ourListenerServer);
ourListenerPort = JettyUtil.getPortForStartedServer(ourListenerServer);
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
}
@AfterAll
public static void stopListenerServer() throws Exception {
JettyUtil.closeServer(ourListenerServer);
}
}

View File

@ -11,12 +11,13 @@ import java.util.List;
public class CountingInterceptor implements ChannelInterceptor {
private static final Logger ourLog = LoggerFactory.getLogger(CountingInterceptor.class);
private List<String> mySent = new ArrayList<>();
public int getSentCount(String theContainingKeyword) {
return (int)mySent.stream().filter(t -> t.contains(theContainingKeyword)).count();
return (int) mySent.stream().filter(t -> t.contains(theContainingKeyword)).count();
}
private static final Logger ourLog = LoggerFactory.getLogger(CountingInterceptor.class);
@Override
public void afterSendCompletion(Message<?> theMessage, MessageChannel theChannel, boolean theSent, Exception theException) {
ourLog.info("Counting another instance: {}", theMessage);

View File

@ -83,6 +83,25 @@ public class SubscriptionValidatingInterceptorTest {
}
}
@Test
public void testValidate_RestHook_MultitypeResourceTypeNotSupported() {
when(myDaoRegistry.isResourceTypeSupported(eq("Patient"))).thenReturn(false);
Subscription subscription = new Subscription();
subscription.setStatus(Subscription.SubscriptionStatus.ACTIVE);
subscription.setCriteria("[Patient]");
subscription.getChannel().setType(Subscription.SubscriptionChannelType.RESTHOOK);
subscription.getChannel().setPayload("application/fhir+json");
subscription.getChannel().setEndpoint("http://foo");
try {
mySvc.validateSubmittedSubscription(subscription);
fail();
} catch (UnprocessableEntityException e) {
assertThat(e.getMessage(), containsString("Subscription.criteria contains invalid/unsupported resource type: Patient"));
}
}
@Test
public void testValidate_RestHook_NoEndpoint() {
when(myDaoRegistry.isResourceTypeSupported(eq("Patient"))).thenReturn(true);

View File

@ -8,10 +8,20 @@ import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.HapiExtensions;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.*;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.CodeableConcept;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Enumerations;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Meta;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.SearchParameter;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@ -68,9 +78,10 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
}
@Test
@ -92,15 +103,14 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
int idx = 0;
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx));
assertEquals("1", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart());
assertEquals("1", ourUpdatedObservations.get(idx).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("1", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue());
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdentifierFirstRep().getValue());
/*
* Send version 2
@ -112,15 +122,14 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
idx++;
waitForSize(0, ourCreatedObservations);
waitForSize(2, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx));
assertEquals("2", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart());
assertEquals("2", ourUpdatedObservations.get(idx).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("2", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue());
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(2);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdentifierFirstRep().getValue());
}
@Test
@ -151,9 +160,9 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Send the transaction
mySystemDao.transaction(null, bundle);
waitForSize(1, ourUpdatedObservations);
ourObservationProvider.waitForUpdateCount(1);
assertThat(ourUpdatedObservations.get(0).getSubject().getReference(), matchesPattern("Patient/[0-9]+"));
assertThat(ourObservationProvider.getStoredResources().get(0).getSubject().getReference(), matchesPattern("Patient/[0-9]+"));
}
@Test
@ -183,14 +192,13 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
int idx = 0;
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx));
assertEquals("1", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart());
assertEquals("1", ourUpdatedObservations.get(idx).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("1", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue());
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdentifierFirstRep().getValue());
/*
* Send version 2
@ -209,14 +217,13 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
idx++;
waitForSize(0, ourCreatedObservations);
waitForSize(2, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(idx));
assertEquals("2", ourUpdatedObservations.get(idx).getIdElement().getVersionIdPart());
assertEquals("2", ourUpdatedObservations.get(idx).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourUpdatedObservations.get(idx).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("2", ourUpdatedObservations.get(idx).getIdentifierFirstRep().getValue());
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(2);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getMeta().getVersionId());
assertEquals(obs.getMeta().getLastUpdatedElement().getValueAsString(), ourObservationProvider.getStoredResources().get(0).getMeta().getLastUpdatedElement().getValueAsString());
assertEquals("2", ourObservationProvider.getStoredResources().get(0).getIdentifierFirstRep().getValue());
}
@Test
@ -237,7 +244,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
myObservationDao.create(observation);
}
waitForSize(100, ourUpdatedObservations);
ourObservationProvider.waitForUpdateCount(100);
}
@ -265,7 +272,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
observation.setStatus(Observation.ObservationStatus.FINAL);
myObservationDao.create(observation);
waitForSize(1, ourUpdatedObservations);
ourObservationProvider.waitForUpdateCount(1);
}
@Test
@ -297,9 +304,9 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
// Send a meta-add
obs.setId(obs.getIdElement().toUnqualifiedVersionless());
@ -312,8 +319,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should be no further deliveries
Thread.sleep(1000);
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
// Send a meta-delete
obs.setId(obs.getIdElement().toUnqualifiedVersionless());
@ -326,8 +333,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should be no further deliveries
Thread.sleep(1000);
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
}
@ -349,9 +356,9 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
// Send a meta-add
obs.setId(obs.getIdElement().toUnqualifiedVersionless());
@ -364,8 +371,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should be no further deliveries
Thread.sleep(1000);
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(3, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(3);
// Send a meta-delete
obs.setId(obs.getIdElement().toUnqualifiedVersionless());
@ -378,8 +385,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should be no further deliveries
Thread.sleep(1000);
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(5, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(5);
}
@ -399,9 +406,9 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
// Send an update with no changes
obs.setId(obs.getIdElement().toUnqualifiedVersionless());
@ -410,8 +417,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should be no further deliveries
Thread.sleep(1000);
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
}
@ -432,11 +439,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
Observation observation1 = sendObservation(code, "SNOMED-CT");
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
IdType idElement = ourUpdatedObservations.get(0).getIdElement();
IdType idElement = ourObservationProvider.getStoredResources().get(0).getIdElement();
assertEquals(observation1.getIdElement().getIdPart(), idElement.getIdPart());
// VersionId is present
assertEquals(observation1.getIdElement().getVersionIdPart(), idElement.getVersionIdPart());
@ -454,11 +461,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
Observation observation2 = sendObservation(code, "SNOMED-CT");
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(2, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(1));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(2);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(1));
idElement = ourUpdatedObservations.get(1).getIdElement();
idElement = ourObservationProvider.getResourceUpdates().get(1).getIdElement();
assertEquals(observation2.getIdElement().getIdPart(), idElement.getIdPart());
// Now VersionId is stripped
assertEquals(null, idElement.getVersionIdPart());
@ -495,11 +502,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
waitForSize(0, ourCreatedObservations);
waitForSize(2, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(2);
Observation observation1 = ourUpdatedObservations.get(0);
Observation observation2 = ourUpdatedObservations.get(1);
Observation observation1 = ourObservationProvider.getResourceUpdates().get(0);
Observation observation2 = ourObservationProvider.getResourceUpdates().get(1);
assertEquals("1", observation1.getIdElement().getVersionIdPart());
assertNull(observation1.getNoteFirstRep().getText());
@ -544,11 +551,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
myStoppableSubscriptionDeliveringRestHookSubscriber.unPause();
waitForSize(0, ourCreatedObservations);
waitForSize(2, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(2);
Observation observation1 = ourUpdatedObservations.get(0);
Observation observation2 = ourUpdatedObservations.get(1);
Observation observation1 = ourObservationProvider.getResourceUpdates().get(0);
Observation observation2 = ourObservationProvider.getResourceUpdates().get(1);
assertEquals("2", observation1.getIdElement().getVersionIdPart());
assertEquals("changed", observation1.getNoteFirstRep().getText());
@ -572,11 +579,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourUpdatedObservations.get(0).getIdElement().getVersionIdPart());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
Subscription subscriptionTemp = myClient.read(Subscription.class, subscription2.getId());
assertNotNull(subscriptionTemp);
@ -589,8 +596,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
// Should see two subscription notifications
waitForSize(0, ourCreatedObservations);
waitForSize(3, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(3);
myClient.delete().resourceById(new IdType("Subscription/" + subscription2.getId())).execute();
waitForActivatedSubscriptionCount(1);
@ -599,8 +606,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
// Should see only one subscription notification
waitForSize(0, ourCreatedObservations);
waitForSize(4, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(4);
Observation observation3 = myClient.read(Observation.class, observationTemp3.getId());
CodeableConcept codeableConcept = new CodeableConcept();
@ -612,8 +619,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see no subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(4, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(4);
Observation observation3a = myClient.read(Observation.class, observationTemp3.getId());
@ -626,8 +633,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see only one subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(5, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(5);
assertFalse(subscription1.getId().equals(subscription2.getId()));
assertFalse(observation1.getId().isEmpty());
@ -652,11 +659,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourUpdatedObservations.get(0).getIdElement().getVersionIdPart());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
Subscription subscriptionTemp = myClient.read(Subscription.class, subscription2.getId());
assertNotNull(subscriptionTemp);
@ -669,8 +676,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
// Should see two subscription notifications
waitForSize(0, ourCreatedObservations);
waitForSize(3, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(3);
myClient.delete().resourceById(new IdType("Subscription/" + subscription2.getId())).execute();
waitForQueueToDrain();
@ -679,8 +686,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
// Should see only one subscription notification
waitForSize(0, ourCreatedObservations);
waitForSize(4, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(4);
Observation observation3 = myClient.read(Observation.class, observationTemp3.getId());
CodeableConcept codeableConcept = new CodeableConcept();
@ -692,8 +699,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see no subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(4, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(4);
Observation observation3a = myClient.read(Observation.class, observationTemp3.getId());
@ -706,8 +713,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see only one subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(5, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(5);
assertFalse(subscription1.getId().equals(subscription2.getId()));
assertFalse(observation1.getId().isEmpty());
@ -730,9 +737,9 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
Observation observation1 = sendObservation(code, "SNOMED-CT");
// Should see 1 subscription notification
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_XML_NEW, ourRestfulServer.getRequestContentTypes().get(0));
Subscription subscriptionTemp = myClient.read(Subscription.class, subscription2.getId());
assertNotNull(subscriptionTemp);
@ -744,8 +751,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
// Should see two subscription notifications
waitForSize(0, ourCreatedObservations);
waitForSize(3, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(3);
myClient.delete().resourceById(new IdType("Subscription/" + subscription2.getId())).execute();
@ -753,8 +760,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see only one subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(4, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(4);
Observation observation3 = myClient.read(Observation.class, observationTemp3.getId());
CodeableConcept codeableConcept = new CodeableConcept();
@ -766,8 +773,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see no subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(4, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(4);
Observation observation3a = myClient.read(Observation.class, observationTemp3.getId());
@ -780,14 +787,68 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see only one subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(5, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(5);
assertFalse(subscription1.getId().equals(subscription2.getId()));
assertFalse(observation1.getId().isEmpty());
assertFalse(observation2.getId().isEmpty());
}
@Test
public void testRestHookSubscriptionStarCriteria() throws Exception {
String payload = "application/json";
String code = "1000000050";
String criteria1 = "[*]";
createSubscription(criteria1, payload);
waitForActivatedSubscriptionCount(1);
sendObservation(code, "SNOMED-CT");
sendPatient();
waitForQueueToDrain();
// Should see 1 subscription notification for each type
ourObservationProvider.waitForCreateCount(0);
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
ourPatientProvider.waitForCreateCount(0);
ourPatientProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(1));
}
@Test
public void testRestHookSubscriptionMultiTypeCriteria() throws Exception {
String payload = "application/json";
String code = "1000000050";
String criteria1 = "[Observation,Patient]";
createSubscription(criteria1, payload);
waitForActivatedSubscriptionCount(1);
sendOrganization();
sendObservation(code, "SNOMED-CT");
sendPatient();
waitForQueueToDrain();
// Should see 1 subscription notification for each type
ourObservationProvider.waitForCreateCount(0);
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
ourPatientProvider.waitForCreateCount(0);
ourPatientProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(1));
ourOrganizationProvider.waitForCreateCount(0);
ourOrganizationProvider.waitForUpdateCount(0);
}
@Test
public void testSubscriptionTriggerViaSubscription() throws Exception {
String payload = "application/xml";
@ -831,11 +892,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
myClient.transaction().withBundle(requestBundle).execute();
// Should see 1 subscription notification
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_XML_NEW, ourRestfulServer.getRequestContentTypes().get(0));
Observation obs = ourUpdatedObservations.get(0);
Observation obs = ourObservationProvider.getStoredResources().get(0);
ourLog.info("Observation content: {}", myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(obs));
}
@ -857,7 +918,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Criteria didn't match, shouldn't see any updates
waitForQueueToDrain();
Thread.sleep(1000);
assertEquals(0, ourUpdatedObservations.size());
assertEquals(0, ourObservationProvider.getCountUpdate());
Subscription subscriptionTemp = myClient.read().resource(Subscription.class).withId(subscription2.getId()).execute();
assertNotNull(subscriptionTemp);
@ -872,8 +933,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
// Should see a subscription notification this time
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
myClient.delete().resourceById(new IdType("Subscription/" + subscription2.getId())).execute();
@ -881,7 +942,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// No more matches
Thread.sleep(1000);
assertEquals(1, ourUpdatedObservations.size());
assertEquals(1, ourObservationProvider.getCountUpdate());
}
@Test
@ -900,9 +961,9 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_XML_NEW, ourRestfulServer.getRequestContentTypes().get(0));
}
@Test
@ -940,11 +1001,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertThat(ourHeaders, hasItem("X-Foo: FOO"));
assertThat(ourHeaders, hasItem("X-Bar: BAR"));
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertThat(ourRestfulServer.getRequestHeaders().get(0), hasItem("X-Foo: FOO"));
assertThat(ourRestfulServer.getRequestHeaders().get(0), hasItem("X-Bar: BAR"));
}
@Test
@ -961,8 +1022,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
// Disable
subscription.setStatus(Subscription.SubscriptionStatus.OFF);
@ -974,8 +1035,8 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(0, ourObservationProvider.getCountCreate());
ourObservationProvider.waitForUpdateCount(1);
}
@ -1096,7 +1157,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
MethodOutcome methodOutcome = myClient.create().resource(bodySite).execute();
assertEquals(true, methodOutcome.getCreated());
waitForQueueToDrain();
waitForSize(1, ourUpdatedObservations);
ourObservationProvider.waitForUpdateCount(1);
}
{
Observation observation = new Observation();
@ -1104,14 +1165,14 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
MethodOutcome methodOutcome = myClient.create().resource(observation).execute();
assertEquals(true, methodOutcome.getCreated());
waitForQueueToDrain();
waitForSize(2, ourUpdatedObservations);
ourObservationProvider.waitForUpdateCount(2);
}
{
Observation observation = new Observation();
MethodOutcome methodOutcome = myClient.create().resource(observation).execute();
assertEquals(true, methodOutcome.getCreated());
waitForQueueToDrain();
waitForSize(2, ourUpdatedObservations);
ourObservationProvider.waitForUpdateCount(2);
}
{
Observation observation = new Observation();
@ -1119,7 +1180,7 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
MethodOutcome methodOutcome = myClient.create().resource(observation).execute();
assertEquals(true, methodOutcome.getCreated());
waitForQueueToDrain();
waitForSize(2, ourUpdatedObservations);
ourObservationProvider.waitForUpdateCount(2);
}
}
@ -1148,14 +1209,11 @@ public class RestHookTestR4Test extends BaseSubscriptionsR4Test {
assertEquals(true, methodOutcome.getCreated());
waitForQueueToDrain();
waitForSize(1, ourTransactions);
ourLog.info("Received transaction: {}", myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(ourTransactions.get(0)));
ourTransactionProvider.waitForTransactionCount(1);
Bundle xact = ourTransactions.get(0);
Bundle xact = ourTransactionProvider.getTransactions().get(0);
assertEquals(2, xact.getEntry().size());
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(ourTransactions.get(0)));
}
}

View File

@ -103,10 +103,11 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
sendObservation();
deliveryLatch.await(10, TimeUnit.SECONDS);
assertEquals(0, ourCreatedObservations.size());
assertEquals(1, ourUpdatedObservations.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
assertEquals("Observation/A", ourUpdatedObservations.get(0).getId());
ourObservationProvider.waitForCreateCount(0);
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("Observation/A/_history/1", ourObservationProvider.getStoredResources().get(0).getId());
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
}
@ -125,12 +126,12 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
sendObservation();
deliveryLatch.await(10, TimeUnit.SECONDS);
assertEquals(0, ourCreatedObservations.size());
assertEquals(1, ourUpdatedObservations.size());
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
ourObservationProvider.waitForCreateCount(0);
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertTrue(ourHitBeforeRestHookDelivery);
assertTrue(ourHitAfterRestHookDelivery);
assertThat(ourHeaders, hasItem("X-Foo: Bar"));
assertThat(ourRestfulServer.getRequestHeaders().get(0), hasItem("X-Foo: Bar"));
}
@Test
@ -174,7 +175,7 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
sendObservation();
Thread.sleep(1000);
assertEquals(0, ourUpdatedObservations.size());
ourObservationProvider.waitForUpdateCount(0);
}
@Test
@ -242,11 +243,11 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0));
ourObservationProvider.waitForCreateCount(0);
ourObservationProvider.waitForUpdateCount(1);
assertEquals(Constants.CT_FHIR_JSON_NEW, ourRestfulServer.getRequestContentTypes().get(0));
assertEquals("1", ourUpdatedObservations.get(0).getIdElement().getVersionIdPart());
assertEquals("1", ourObservationProvider.getStoredResources().get(0).getIdElement().getVersionIdPart());
Subscription subscriptionTemp = myClient.read(Subscription.class, subscription2.getId());
assertNotNull(subscriptionTemp);
@ -259,8 +260,8 @@ public class RestHookWithInterceptorR4Test extends BaseSubscriptionsR4Test {
waitForQueueToDrain();
// Should see two subscription notifications
waitForSize(0, ourCreatedObservations);
waitForSize(3, ourUpdatedObservations);
ourObservationProvider.waitForCreateCount(0);
ourObservationProvider.waitForUpdateCount(3);
ourLog.info("Messages:\n " + messages.stream().collect(Collectors.joining("\n ")));

View File

@ -54,7 +54,6 @@ import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.text.StringTokenizer;
import org.apache.commons.text.matcher.StringMatcher;
import org.fhir.ucum.Pair;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IBase;
@ -1556,8 +1555,7 @@ public abstract class BaseSearchParamExtractor implements ISearchParamExtractor
@Nonnull
public static String[] splitPathsR4(@Nonnull String thePaths) {
StringTokenizer tok = new StringTokenizer(thePaths, " |");
StringMatcher trimmerMatcher = (buffer, start, bufferStart, bufferEnd) -> (buffer[start] <= 32) ? 1 : 0;
tok.setTrimmerMatcher(trimmerMatcher);
tok.setTrimmerMatcher(new StringTrimmingTrimmerMatcher());
return tok.getTokenArray();
}

View File

@ -0,0 +1,15 @@
package ca.uhn.fhir.jpa.searchparam.extractor;
import org.apache.commons.text.matcher.StringMatcher;
/**
* Utility class that works with the commons-text
* {@link org.apache.commons.text.StringTokenizer}
* class to return tokens that are whitespace trimmed.
*/
public class StringTrimmingTrimmerMatcher implements StringMatcher {
@Override
public int isMatch(char[] buffer, int start, int bufferStart, int bufferEnd) {
return (buffer[start] <= 32) ? 1 : 0;
}
}

View File

@ -22,6 +22,8 @@ package ca.uhn.fhir.jpa.subscription.match.matcher.matching;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
import ca.uhn.fhir.rest.api.Constants;
import org.springframework.beans.factory.annotation.Autowired;
public class SubscriptionStrategyEvaluator {
@ -37,9 +39,16 @@ public class SubscriptionStrategyEvaluator {
}
public SubscriptionMatchingStrategy determineStrategy(String theCriteria) {
InMemoryMatchResult result = myInMemoryResourceMatcher.canBeEvaluatedInMemory(theCriteria);
if (result.supported()) {
return SubscriptionMatchingStrategy.IN_MEMORY;
SubscriptionCriteriaParser.SubscriptionCriteria criteria = SubscriptionCriteriaParser.parse(theCriteria);
if (criteria != null) {
if (criteria.getCriteria() != null) {
InMemoryMatchResult result = myInMemoryResourceMatcher.canBeEvaluatedInMemory(theCriteria);
if (result.supported()) {
return SubscriptionMatchingStrategy.IN_MEMORY;
}
} else {
return SubscriptionMatchingStrategy.IN_MEMORY;
}
}
return SubscriptionMatchingStrategy.DATABASE;
}

View File

@ -0,0 +1,117 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import ca.uhn.fhir.jpa.searchparam.extractor.StringTrimmingTrimmerMatcher;
import ca.uhn.fhir.rest.api.Constants;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.commons.text.StringTokenizer;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.trim;
public enum SubscriptionCriteriaParser {
;
public enum TypeEnum {
/**
* Normal search URL expression
*/
SEARCH_EXPRESSION,
/**
* Collection of resource types
*/
MULTITYPE_EXPRESSION,
/**
* All types
*/
STARTYPE_EXPRESSION
}
public static class SubscriptionCriteria {
private final TypeEnum myType;
private final String myCriteria;
private final Set<String> myApplicableResourceTypes;
private SubscriptionCriteria(TypeEnum theType, String theCriteria, Set<String> theApplicableResourceTypes) {
myType = theType;
myCriteria = theCriteria;
myApplicableResourceTypes = theApplicableResourceTypes;
}
@Override
public String toString() {
ToStringBuilder retVal = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
retVal.append("type", myType);
if (isNotBlank(myCriteria)) {
retVal.append("criteria", myCriteria);
}
if (myApplicableResourceTypes != null) {
retVal.append("applicableResourceTypes", myApplicableResourceTypes);
}
return retVal.toString();
}
public TypeEnum getType() {
return myType;
}
public String getCriteria() {
return myCriteria;
}
public Set<String> getApplicableResourceTypes() {
return myApplicableResourceTypes;
}
}
@Nullable
public static SubscriptionCriteria parse(String theCriteria) {
String criteria = trim(theCriteria);
if (isBlank(criteria)) {
return null;
}
if (criteria.startsWith(Constants.SUBSCRIPTION_MULTITYPE_PREFIX)) {
if (criteria.endsWith(Constants.SUBSCRIPTION_MULTITYPE_SUFFIX)) {
String multitypeExpression = criteria.substring(1, criteria.length() - 1);
StringTokenizer tok = new StringTokenizer(multitypeExpression, ",");
tok.setTrimmerMatcher(new StringTrimmingTrimmerMatcher());
List<String> types = tok.getTokenList();
if (types.isEmpty()) {
return null;
}
if (types.contains(Constants.SUBSCRIPTION_MULTITYPE_STAR)) {
return new SubscriptionCriteria(TypeEnum.STARTYPE_EXPRESSION, null, null);
}
Set<String> typesSet = Sets.newHashSet(types);
return new SubscriptionCriteria(TypeEnum.MULTITYPE_EXPRESSION, null, typesSet);
}
}
if (Character.isLetter(criteria.charAt(0))) {
String criteriaType = criteria;
int questionMarkIdx = criteriaType.indexOf('?');
if (questionMarkIdx > 0) {
criteriaType = criteriaType.substring(0, questionMarkIdx);
}
Set<String> types = Collections.singleton(criteriaType);
return new SubscriptionCriteria(TypeEnum.SEARCH_EXPRESSION, criteria, types);
}
return null;
}
}

View File

@ -15,7 +15,6 @@ import ca.uhn.fhir.jpa.subscription.model.ResourceDeliveryMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.rest.api.EncodingEnum;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
@ -136,18 +135,24 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
}
}
if (!validCriteria(nextActiveSubscription, resourceId)) {
if (!resourceTypeIsAppropriateForSubscription(nextActiveSubscription, resourceId)) {
continue;
}
InMemoryMatchResult matchResult = mySubscriptionMatcher.match(nextActiveSubscription.getSubscription(), theMsg);
if (!matchResult.matched()) {
continue;
InMemoryMatchResult matchResult;
if (nextActiveSubscription.getCriteria().getType() == SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
matchResult = mySubscriptionMatcher.match(nextActiveSubscription.getSubscription(), theMsg);
if (!matchResult.matched()) {
continue;
}
ourLog.debug("Subscription {} was matched by resource {} {}",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
} else {
matchResult = InMemoryMatchResult.successfulMatch();
matchResult.setInMemory(true);
}
ourLog.debug("Subscription {} was matched by resource {} {}",
nextActiveSubscription.getId(),
resourceId.toUnqualifiedVersionless().getValue(),
matchResult.isInMemory() ? "in-memory" : "by querying the repository");
IBaseResource payload = theMsg.getNewPayload(myFhirContext);
CanonicalSubscription subscription = nextActiveSubscription.getSubscription();
@ -215,28 +220,26 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
return theActiveSubscription.getId();
}
private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) {
String criteriaString = theActiveSubscription.getCriteriaString();
private boolean resourceTypeIsAppropriateForSubscription(ActiveSubscription theActiveSubscription, IIdType theResourceId) {
SubscriptionCriteriaParser.SubscriptionCriteria criteria = theActiveSubscription.getCriteria();
String subscriptionId = getId(theActiveSubscription);
String resourceType = theResourceId.getResourceType();
if (StringUtils.isBlank(criteriaString)) {
return false;
}
// see if the criteria matches the created object
ourLog.trace("Checking subscription {} for {} with criteria {}", subscriptionId, resourceType, criteriaString);
String criteriaResource = criteriaString;
int index = criteriaResource.indexOf("?");
if (index != -1) {
criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?"));
}
ourLog.trace("Checking subscription {} for {} with criteria {}", subscriptionId, resourceType, criteria);
if (resourceType != null && !criteriaResource.equals(resourceType)) {
ourLog.trace("Skipping subscription search for {} because it does not match the criteria {}", resourceType, criteriaString);
if (criteria == null) {
return false;
}
return true;
switch (criteria.getType()) {
default:
case SEARCH_EXPRESSION:
case MULTITYPE_EXPRESSION:
return criteria.getApplicableResourceTypes().contains(resourceType);
case STARTYPE_EXPRESSION:
return !resourceType.equals("Subscription");
}
}
}

View File

@ -20,41 +20,41 @@ package ca.uhn.fhir.jpa.subscription.match.registry;
* #L%
*/
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ActiveSubscription {
private static final Logger ourLog = LoggerFactory.getLogger(ActiveSubscription.class);
private CanonicalSubscription mySubscription;
private SubscriptionCriteriaParser.SubscriptionCriteria myCriteria;
private final String myChannelName;
private final String myId;
private CanonicalSubscription mySubscription;
private boolean flagForDeletion;
public ActiveSubscription(CanonicalSubscription theSubscription, String theChannelName) {
mySubscription = theSubscription;
myChannelName = theChannelName;
myId = theSubscription.getIdPart();
setSubscription(theSubscription);
}
public SubscriptionCriteriaParser.SubscriptionCriteria getCriteria() {
return myCriteria;
}
public CanonicalSubscription getSubscription() {
return mySubscription;
}
public final void setSubscription(CanonicalSubscription theSubscription) {
mySubscription = theSubscription;
myCriteria = SubscriptionCriteriaParser.parse(theSubscription.getCriteriaString());
}
public String getChannelName() {
return myChannelName;
}
public String getCriteriaString() {
return mySubscription.getCriteriaString();
}
public void setSubscription(CanonicalSubscription theCanonicalizedSubscription) {
mySubscription = theCanonicalizedSubscription;
}
public boolean isFlagForDeletion() {
return flagForDeletion;
}

View File

@ -25,9 +25,9 @@ import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy;
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionCriteriaParser;
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
@ -128,6 +128,25 @@ public class SubscriptionValidatingInterceptor {
throw new UnprocessableEntityException(theFieldName + " must be populated");
}
SubscriptionCriteriaParser.SubscriptionCriteria parsedCriteria = SubscriptionCriteriaParser.parse(theQuery);
if (parsedCriteria == null) {
throw new UnprocessableEntityException(theFieldName + " can not be parsed");
}
if (parsedCriteria.getType() == SubscriptionCriteriaParser.TypeEnum.STARTYPE_EXPRESSION) {
return;
}
for (String next : parsedCriteria.getApplicableResourceTypes()) {
if (!myDaoRegistry.isResourceTypeSupported(next)) {
throw new UnprocessableEntityException(theFieldName + " contains invalid/unsupported resource type: " + next);
}
}
if (parsedCriteria.getType() != SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION) {
return;
}
int sep = theQuery.indexOf('?');
if (sep <= 1) {
throw new UnprocessableEntityException(theFieldName + " must be in the form \"{Resource Type}?[params]\"");
@ -138,9 +157,6 @@ public class SubscriptionValidatingInterceptor {
throw new UnprocessableEntityException(theFieldName + " must be in the form \"{Resource Type}?[params]\"");
}
if (!myDaoRegistry.isResourceTypeSupported(resType)) {
throw new UnprocessableEntityException(theFieldName + " contains invalid/unsupported resource type: " + resType);
}
}
public void validateMessageSubscriptionEndpoint(String theEndpointUrl) {

View File

@ -0,0 +1,60 @@
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.*;
public class SubscriptionCriteriaParserTest {
@Test
public void testSearchExpression() {
String expression = "Patient?foo=bar";
SubscriptionCriteriaParser.SubscriptionCriteria criteria = SubscriptionCriteriaParser.parse(expression);
assertEquals(SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION, criteria.getType());
assertEquals(expression, criteria.getCriteria());
assertThat(criteria.getApplicableResourceTypes(), containsInAnyOrder("Patient"));
assertEquals("SubscriptionCriteriaParser.SubscriptionCriteria[type=SEARCH_EXPRESSION,criteria=Patient?foo=bar,applicableResourceTypes=[Patient]]", criteria.toString());
}
@Test
public void testTypeExpression() {
String expression = "Patient";
SubscriptionCriteriaParser.SubscriptionCriteria criteria = SubscriptionCriteriaParser.parse(expression);
assertEquals(SubscriptionCriteriaParser.TypeEnum.SEARCH_EXPRESSION, criteria.getType());
assertEquals(expression, criteria.getCriteria());
assertThat(criteria.getApplicableResourceTypes(), containsInAnyOrder("Patient"));
assertEquals("SubscriptionCriteriaParser.SubscriptionCriteria[type=SEARCH_EXPRESSION,criteria=Patient,applicableResourceTypes=[Patient]]", criteria.toString());
}
@Test
public void testStarExpression() {
String expression = "[*]";
SubscriptionCriteriaParser.SubscriptionCriteria criteria = SubscriptionCriteriaParser.parse(expression);
assertEquals(SubscriptionCriteriaParser.TypeEnum.STARTYPE_EXPRESSION, criteria.getType());
assertEquals(null, criteria.getCriteria());
assertEquals(null, criteria.getApplicableResourceTypes());
assertEquals("SubscriptionCriteriaParser.SubscriptionCriteria[type=STARTYPE_EXPRESSION]", criteria.toString());
}
@Test
public void testMultitypeExpression() {
String expression = "[Patient , Observation]";
SubscriptionCriteriaParser.SubscriptionCriteria criteria = SubscriptionCriteriaParser.parse(expression);
assertEquals(SubscriptionCriteriaParser.TypeEnum.MULTITYPE_EXPRESSION, criteria.getType());
assertEquals(null, criteria.getCriteria());
assertThat(criteria.getApplicableResourceTypes(), containsInAnyOrder("Patient", "Observation"));
assertEquals("SubscriptionCriteriaParser.SubscriptionCriteria[type=MULTITYPE_EXPRESSION,applicableResourceTypes=[Observation, Patient]]", criteria.toString());
}
@Test
public void testInvalidExpression() {
assertNull(SubscriptionCriteriaParser.parse("[]"));
assertNull(SubscriptionCriteriaParser.parse(""));
assertNull(SubscriptionCriteriaParser.parse(null));
assertNull(SubscriptionCriteriaParser.parse(" "));
assertNull(SubscriptionCriteriaParser.parse("#123"));
}
}

View File

@ -14,14 +14,14 @@ public class SubscriptionRegistryTest extends BaseSubscriptionRegistryTest {
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertRegistrySize(1);
ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteria().getCriteria());
subscription.setCriteria(NEW_CRITERIA);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteria().getCriteria());
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertRegistrySize(1);
ActiveSubscription newActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
assertEquals(NEW_CRITERIA, newActiveSubscription.getCriteriaString());
assertEquals(NEW_CRITERIA, newActiveSubscription.getCriteria().getCriteria());
// The same object
assertTrue(newActiveSubscription == origActiveSubscription);
}
@ -34,11 +34,11 @@ public class SubscriptionRegistryTest extends BaseSubscriptionRegistryTest {
assertRegistrySize(1);
ActiveSubscription origActiveSubscription = mySubscriptionRegistry.get(SUBSCRIPTION_ID);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteria().getCriteria());
setChannel(subscription, Subscription.SubscriptionChannelType.EMAIL);
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteriaString());
assertEquals(ORIG_CRITERIA, origActiveSubscription.getCriteria().getCriteria());
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(subscription);
assertRegistrySize(1);

View File

@ -77,4 +77,30 @@ public class SubscriptionMatchingSubscriberTest extends BaseBlockingQueueSubscri
assertEquals(0, ourContentTypes.size());
}
@Test
public void testCriteriaStarOnly() throws InterruptedException {
String payload = "application/fhir+xml";
String code = "1000000050";
String criteria1 = "[*]";
String criteria2 = "[*]";
String criteria3 = "Observation?code=FOO"; // won't match
sendSubscription(criteria1, payload, ourListenerServerBase);
sendSubscription(criteria2, payload, ourListenerServerBase);
sendSubscription(criteria3, payload, ourListenerServerBase);
assertEquals(3, mySubscriptionRegistry.size());
ourObservationListener.setExpectedCount(2);
sendObservation(code, "SNOMED-CT");
ourObservationListener.awaitExpected();
assertEquals(2, ourContentTypes.size());
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
}
}

View File

@ -20,15 +20,28 @@ package ca.uhn.fhir.test.utilities.server;
* #L%
*/
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.provider.HashMapResourceProvider;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class HashMapResourceProviderExtension<T extends IBaseResource> extends HashMapResourceProvider<T> implements BeforeEachCallback, AfterEachCallback {
private final RestfulServerExtension myRestfulServerExtension;
private boolean myClearBetweenTests = true;
private final List<T> myUpdates = new ArrayList<>();
/**
* Constructor
@ -46,9 +59,47 @@ public class HashMapResourceProviderExtension<T extends IBaseResource> extends H
myRestfulServerExtension.getRestfulServer().unregisterProvider(HashMapResourceProviderExtension.this);
}
@Override
public synchronized MethodOutcome update(T theResource, String theConditional, RequestDetails theRequestDetails) {
T resourceClone = getFhirContext().newTerser().clone(theResource);
myUpdates.add(resourceClone);
return super.update(theResource, theConditional, theRequestDetails);
}
@Override
public synchronized void clear() {
super.clear();
if (myUpdates != null) {
myUpdates.clear();
}
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
clear();
if (myClearBetweenTests) {
clear();
clearCounts();
}
myRestfulServerExtension.getRestfulServer().registerProvider(HashMapResourceProviderExtension.this);
}
public HashMapResourceProviderExtension<T> dontClearBetweenTests() {
myClearBetweenTests = false;
return this;
}
public void waitForUpdateCount(long theCount) {
assertThat(theCount, greaterThanOrEqualTo(getCountUpdate()));
await().until(()->getCountUpdate(), equalTo(theCount));
}
public void waitForCreateCount(long theCount) {
assertThat(theCount, greaterThanOrEqualTo(getCountCreate()));
await().until(()->getCountCreate(), equalTo(theCount));
}
public List<T> getResourceUpdates() {
return Collections.unmodifiableList(myUpdates);
}
}

View File

@ -22,6 +22,10 @@ package ca.uhn.fhir.test.utilities.server;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.ServerValidationModeEnum;
import ca.uhn.fhir.rest.server.RestfulServer;
@ -33,23 +37,28 @@ import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class RestfulServerExtension implements BeforeEachCallback, AfterEachCallback {
private static final Logger ourLog = LoggerFactory.getLogger(RestfulServerExtension.class);
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class RestfulServerExtension implements BeforeEachCallback, AfterEachCallback, AfterAllCallback {
private static final Logger ourLog = LoggerFactory.getLogger(RestfulServerExtension.class);
private final List<List<String>> myRequestHeaders = new ArrayList<>();
private final List<String> myRequestContentTypes = new ArrayList<>();
private FhirContext myFhirContext;
private List<Object> myProviders = new ArrayList<>();
private FhirVersionEnum myFhirVersion;
@ -60,6 +69,7 @@ public class RestfulServerExtension implements BeforeEachCallback, AfterEachCall
private IGenericClient myFhirClient;
private List<Consumer<RestfulServer>> myConsumers = new ArrayList<>();
private String myServletPath = "/*";
private boolean myKeepAliveBetweenTests;
/**
* Constructor
@ -87,6 +97,9 @@ public class RestfulServerExtension implements BeforeEachCallback, AfterEachCall
}
private void stopServer() throws Exception {
if (myServer == null) {
return;
}
JettyUtil.closeServer(myServer);
myServer = null;
myFhirClient = null;
@ -96,10 +109,15 @@ public class RestfulServerExtension implements BeforeEachCallback, AfterEachCall
}
private void startServer() throws Exception {
if (myServer != null) {
return;
}
myServer = new Server(myPort);
myServlet = new RestfulServer(myFhirContext);
myServlet.setDefaultPrettyPrint(true);
myServlet.registerInterceptor(new ListenerExtension());
if (myProviders != null) {
myServlet.registerProviders(myProviders);
}
@ -124,7 +142,6 @@ public class RestfulServerExtension implements BeforeEachCallback, AfterEachCall
myFhirClient = myFhirContext.newRestfulGenericClient("http://localhost:" + myPort);
}
public IGenericClient getFhirClient() {
return myFhirClient;
}
@ -142,15 +159,34 @@ public class RestfulServerExtension implements BeforeEachCallback, AfterEachCall
return myPort;
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
stopServer();
public List<String> getRequestContentTypes() {
return myRequestContentTypes;
}
public List<List<String>> getRequestHeaders() {
return myRequestHeaders;
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
createContextIfNeeded();
startServer();
myRequestContentTypes.clear();
myRequestHeaders.clear();
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
if (!myKeepAliveBetweenTests) {
stopServer();
}
}
@Override
public void afterAll(ExtensionContext context) throws Exception {
if (myKeepAliveBetweenTests) {
stopServer();
}
}
public RestfulServerExtension registerProvider(Object theProvider) {
@ -188,4 +224,43 @@ public class RestfulServerExtension implements BeforeEachCallback, AfterEachCall
myPort = thePort;
return this;
}
public RestfulServerExtension keepAliveBetweenTests() {
myKeepAliveBetweenTests = true;
return this;
}
public String getBaseUrl() {
return "http://localhost:" + myPort;
}
@Interceptor
private class ListenerExtension {
@Hook(Pointcut.SERVER_INCOMING_REQUEST_POST_PROCESSED)
public void postProcessed(HttpServletRequest theRequest) {
String header = theRequest.getHeader(Constants.HEADER_CONTENT_TYPE);
if (isNotBlank(header)) {
myRequestContentTypes.add(header.replaceAll(";.*", ""));
} else {
myRequestContentTypes.add(null);
}
java.util.Enumeration<String> headerNamesEnum = theRequest.getHeaderNames();
List<String> requestHeaders = new ArrayList<>();
myRequestHeaders.add(requestHeaders);
while (headerNamesEnum.hasMoreElements()) {
String nextName = headerNamesEnum.nextElement();
Enumeration<String> valueEnum = theRequest.getHeaders(nextName);
while (valueEnum.hasMoreElements()) {
String nextValue = valueEnum.nextElement();
requestHeaders.add(nextName + ": " + nextValue);
}
}
}
}
}

View File

@ -0,0 +1,68 @@
package ca.uhn.fhir.test.utilities.server;
import ca.uhn.fhir.rest.annotation.Transaction;
import ca.uhn.fhir.rest.annotation.TransactionParam;
import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class TransactionCapturingProviderExtension<T extends IBaseBundle> implements BeforeEachCallback, AfterEachCallback {
private static final Logger ourLog = LoggerFactory.getLogger(TransactionCapturingProviderExtension.class);
private final RestfulServerExtension myRestfulServerExtension;
private final List<T> myInputBundles = Collections.synchronizedList(new ArrayList<>());
private PlainProvider myProvider;
/**
* Constructor
*/
public TransactionCapturingProviderExtension(RestfulServerExtension theRestfulServerExtension, Class<T> theBundleType) {
myRestfulServerExtension = theRestfulServerExtension;
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
myProvider = new PlainProvider();
myRestfulServerExtension.getRestfulServer().unregisterProvider(myProvider);
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
myRestfulServerExtension.getRestfulServer().registerProvider(myProvider);
myInputBundles.clear();
}
public void waitForTransactionCount(int theCount) {
assertThat(theCount, greaterThanOrEqualTo(myInputBundles.size()));
await().until(()->myInputBundles.size(), equalTo(theCount));
}
public List<T> getTransactions() {
return Collections.unmodifiableList(myInputBundles);
}
private class PlainProvider {
@Transaction
public T transaction(@TransactionParam T theInput) {
ourLog.info("Received transaction update");
myInputBundles.add(theInput);
return theInput;
}
}
}