Subscription cleanup trying to get tests passing again

This commit is contained in:
James Agnew 2018-08-13 10:25:16 -04:00
parent 39ef79ff2c
commit 8a19682ce3
6 changed files with 181 additions and 119 deletions

View File

@ -335,7 +335,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
return mySubscriptionDao;
}
public List<CanonicalSubscription> getSubscriptions() {
public List<CanonicalSubscription> getRegisteredSubscriptions() {
return new ArrayList<>(myIdToSubscription.values());
}
@ -444,6 +444,10 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
@Override
public void resourceUpdated(RequestDetails theRequest, IBaseResource theOldResource, IBaseResource theNewResource) {
submitResourceModifiedForUpdate(theNewResource);
}
void submitResourceModifiedForUpdate(IBaseResource theNewResource) {
ResourceModifiedMessage msg = new ResourceModifiedMessage();
msg.setId(theNewResource.getIdElement());
msg.setOperationType(RestOperationTypeEnum.UPDATE);
@ -457,13 +461,18 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
/*
* We only actually submit this item work working after the
*/
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
ourLog.trace("Sending resource modified message to processing channel");
getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
}
});
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
ourLog.trace("Sending resource modified message to processing channel");
getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
}
});
} else {
ourLog.trace("Sending resource modified message to processing channel");
getProcessingChannel().send(new ResourceModifiedJsonMessage(theMessage));
}
}
@VisibleForTesting

View File

@ -142,16 +142,18 @@ public class SubscriptionActivatingSubscriber {
private void activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) {
IBaseResource subscription = mySubscriptionDao.read(theSubscription.getIdElement());
ourLog.info("Activating and subscription {} from status {} to {} for channel {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus, myChannelType);
ourLog.info("Activating subscription {} from status {} to {} for channel {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus, myChannelType);
try {
SubscriptionUtil.setStatus(myCtx, subscription, theActiveStatus);
mySubscriptionDao.update(subscription);
subscription = mySubscriptionDao.update(subscription).getResource();
mySubscriptionInterceptor.submitResourceModifiedForUpdate(subscription);
} catch (final UnprocessableEntityException e) {
ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
SubscriptionUtil.setStatus(myCtx, subscription, "error");
SubscriptionUtil.setReason(myCtx, subscription, e.getMessage());
mySubscriptionDao.update(subscription);
}
}
@SuppressWarnings("EnumSwitchStatementWhichMissesCases")

View File

@ -70,7 +70,7 @@ public class SubscriptionCheckingSubscriber extends BaseSubscriptionSubscriber {
String resourceType = id.getResourceType();
String resourceId = id.getIdPart();
List<CanonicalSubscription> subscriptions = getSubscriptionInterceptor().getSubscriptions();
List<CanonicalSubscription> subscriptions = getSubscriptionInterceptor().getRegisteredSubscriptions();
ourLog.trace("Testing {} subscriptions for applicability");

View File

@ -8,7 +8,6 @@ import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.subscription.resthook.SubscriptionRestHookInterceptor;
import ca.uhn.fhir.jpa.util.ResourceCountCache;
import ca.uhn.fhir.jpa.util.SingleItemLoadingCache;
import ca.uhn.fhir.jpa.validation.JpaValidationSupportChainR4;
import ca.uhn.fhir.narrative.DefaultThymeleafNarrativeGenerator;
import ca.uhn.fhir.parser.StrictErrorHandler;
@ -44,7 +43,6 @@ import org.springframework.web.servlet.DispatcherServlet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@ -52,7 +50,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
protected static JpaValidationSupportChainR4 myValidationSupport;
protected IGenericClient myClient;
protected static CloseableHttpClient ourHttpClient;
protected static int ourPort;
protected static RestfulServer ourRestServer;
@ -61,17 +58,84 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
protected static DatabaseBackedPagingProvider ourPagingProvider;
protected static ISearchDao mySearchEntityDao;
protected static ISearchCoordinatorSvc mySearchCoordinatorSvc;
private static Server ourServer;
protected static GenericWebApplicationContext ourWebApplicationContext;
private static Server ourServer;
protected IGenericClient myClient;
protected ResourceCountCache ourResourceCountsCache;
private TerminologyUploaderProviderR4 myTerminologyUploaderProvider;
private Object ourGraphQLProvider;
private boolean ourRestHookSubscriptionInterceptorRequested;
protected ResourceCountCache ourResourceCountsCache;
public BaseResourceProviderR4Test() {
super();
}
@AfterClass
public static void afterClassClearContextBaseResourceProviderR4Test() throws Exception {
ourServer.stop();
ourHttpClient.close();
ourServer = null;
ourHttpClient = null;
myValidationSupport.flush();
myValidationSupport = null;
ourWebApplicationContext.close();
ourWebApplicationContext = null;
TestUtil.clearAllStaticFieldsForUnitTest();
}
public static int getNumberOfParametersByName(Parameters theParameters, String theName) {
int retVal = 0;
for (ParametersParameterComponent param : theParameters.getParameter()) {
if (param.getName().equals(theName)) {
retVal++;
}
}
return retVal;
}
public static ParametersParameterComponent getParameterByName(Parameters theParameters, String theName) {
for (ParametersParameterComponent param : theParameters.getParameter()) {
if (param.getName().equals(theName)) {
return param;
}
}
return new ParametersParameterComponent();
}
public static List<ParametersParameterComponent> getParametersByName(Parameters theParameters, String theName) {
List<ParametersParameterComponent> params = new ArrayList<>();
for (ParametersParameterComponent param : theParameters.getParameter()) {
if (param.getName().equals(theName)) {
params.add(param);
}
}
return params;
}
public static ParametersParameterComponent getPartByName(ParametersParameterComponent theParameter, String theName) {
for (ParametersParameterComponent part : theParameter.getPart()) {
if (part.getName().equals(theName)) {
return part;
}
}
return new ParametersParameterComponent();
}
public static boolean hasParameterByName(Parameters theParameters, String theName) {
for (ParametersParameterComponent param : theParameters.getParameter()) {
if (param.getName().equals(theName)) {
return true;
}
}
return false;
}
@After
public void after() throws Exception {
myFhirCtx.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.ONCE);
@ -128,7 +192,7 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
subsServletHolder.setServlet(dispatcherServlet);
subsServletHolder.setInitParameter(
ContextLoader.CONFIG_LOCATION_PARAM,
WebsocketDispatcherConfig.class.getName());
WebsocketDispatcherConfig.class.getName());
proxyHandler.addServlet(subsServletHolder, "/*");
// Register a CORS filter
@ -204,69 +268,10 @@ public abstract class BaseResourceProviderR4Test extends BaseJpaR4Test {
return names;
}
@AfterClass
public static void afterClassClearContextBaseResourceProviderR4Test() throws Exception {
ourServer.stop();
ourHttpClient.close();
ourServer = null;
ourHttpClient = null;
myValidationSupport.flush();
myValidationSupport = null;
ourWebApplicationContext.close();
ourWebApplicationContext = null;
TestUtil.clearAllStaticFieldsForUnitTest();
protected void waitForRegisteredSubscriptionCount(int theSize) throws Exception {
SubscriptionRestHookInterceptor interceptor = getRestHookSubscriptionInterceptor();
TestUtil.waitForSize(theSize, () -> interceptor.getRegisteredSubscriptions().size());
Thread.sleep(500);
}
public static int getNumberOfParametersByName(Parameters theParameters, String theName) {
int retVal = 0;
for (ParametersParameterComponent param : theParameters.getParameter()) {
if (param.getName().equals(theName)) {
retVal++;
}
}
return retVal;
}
public static ParametersParameterComponent getParameterByName(Parameters theParameters, String theName) {
for (ParametersParameterComponent param : theParameters.getParameter()) {
if (param.getName().equals(theName)) {
return param;
}
}
return new ParametersParameterComponent();
}
public static List<ParametersParameterComponent> getParametersByName(Parameters theParameters, String theName) {
List<ParametersParameterComponent> params = new ArrayList<>();
for (ParametersParameterComponent param : theParameters.getParameter()) {
if (param.getName().equals(theName)) {
params.add(param);
}
}
return params;
}
public static ParametersParameterComponent getPartByName(ParametersParameterComponent theParameter, String theName) {
for (ParametersParameterComponent part : theParameter.getPart()) {
if (part.getName().equals(theName)) {
return part;
}
}
return new ParametersParameterComponent();
}
public static boolean hasParameterByName(Parameters theParameters, String theName) {
for (ParametersParameterComponent param : theParameters.getParameter()) {
if (param.getName().equals(theName)) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,21 @@
package ca.uhn.fhir.jpa.subscription.r4;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
public class CountingInterceptor extends ChannelInterceptorAdapter {
private int mySentCount;
public int getSentCount() {
return mySentCount;
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
if (sent) {
mySentCount++;
}
}
}

View File

@ -2,9 +2,9 @@ package ca.uhn.fhir.jpa.subscription.r4;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.provider.JpaConformanceProviderDstu2;
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.subscription.RestHookTestDstu2Test;
import ca.uhn.fhir.jpa.subscription.resthook.SubscriptionRestHookInterceptor;
import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
@ -14,7 +14,9 @@ import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.BundleUtil;
import ca.uhn.fhir.util.PortUtil;
import ca.uhn.fhir.util.TestUtil;
import com.google.common.collect.Lists;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -23,6 +25,7 @@ import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.*;
import org.junit.*;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
@ -49,6 +52,34 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
private static List<String> ourContentTypes = new ArrayList<>();
private static List<String> ourHeaders = new ArrayList<>();
private List<IIdType> mySubscriptionIds = new ArrayList<>();
private CountingInterceptor myCountingInterceptor;
@BeforeClass
public static void startListenerServer() throws Exception {
ourListenerPort = PortUtil.findFreePort();
ourListenerRestServer = new RestfulServer(FhirContext.forR4());
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
ObservationListener obsListener = new ObservationListener();
ourListenerRestServer.setResourceProviders(obsListener);
ourListenerServer = new Server(ourListenerPort);
ServletContextHandler proxyHandler = new ServletContextHandler();
proxyHandler.setContextPath("/");
ServletHolder servletHolder = new ServletHolder();
servletHolder.setServlet(ourListenerRestServer);
proxyHandler.addServlet(servletHolder, "/fhir/context/*");
ourListenerServer.setHandler(proxyHandler);
ourListenerServer.start();
}
@AfterClass
public static void stopListenerServer() throws Exception {
ourListenerServer.stop();
}
@After
public void afterUnregisterRestHookListener() {
@ -75,11 +106,23 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
}
@Before
public void beforeReset() {
public void beforeReset() throws Exception {
ourCreatedObservations.clear();
ourUpdatedObservations.clear();
ourContentTypes.clear();
ourHeaders.clear();
// Delete all Subscriptions
Bundle allSubscriptions = myClient.search().forResource(Subscription.class).returnBundle(Bundle.class).execute();
for (IBaseResource next : BundleUtil.toListOfResources(myFhirCtx, allSubscriptions)) {
myClient.delete().resource(next).execute();
}
waitForRegisteredSubscriptionCount(0);
ExecutorSubscribableChannel processingChannel = (ExecutorSubscribableChannel) getRestHookSubscriptionInterceptor().getProcessingChannel();
processingChannel.setInterceptors(new ArrayList<>());
myCountingInterceptor = new CountingInterceptor();
processingChannel.addInterceptor(myCountingInterceptor);
}
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
@ -97,7 +140,6 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
subscription.setId(methodOutcome.getId().getIdPart());
mySubscriptionIds.add(methodOutcome.getId());
waitForQueueToDrain();
return subscription;
}
@ -129,6 +171,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
createSubscription(criteria1, payload, ourListenerServerBase);
createSubscription(criteria2, payload, ourListenerServerBase);
waitForRegisteredSubscriptionCount(2);
sendObservation(code, "SNOMED-CT");
@ -149,6 +192,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
createSubscription(criteria1, payload, ourListenerServerBase);
createSubscription(criteria2, payload, ourListenerServerBase);
waitForRegisteredSubscriptionCount(2);
Observation obs = sendObservation(code, "SNOMED-CT");
@ -169,12 +213,8 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
waitForSize(1, ourUpdatedObservations);
}
@Test
public void testRestHookSubscriptionApplicationJsonDisableVersionIdInDelivery() throws Exception {
String payload = "application/json";
@ -182,16 +222,23 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
String code = "1000000050";
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
waitForRegisteredSubscriptionCount(0);
Subscription subscription1 = createSubscription(criteria1, payload, ourListenerServerBase);
waitForRegisteredSubscriptionCount(1);
int modCount = myCountingInterceptor.getSentCount();
subscription1
.getChannel()
.addExtension(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_STRIP_VERSION_IDS, new BooleanType("true"));
subscription1
.getChannel()
.addExtension(JpaConstants.EXT_SUBSCRIPTION_RESTHOOK_DELIVER_LATEST_VERSION, new BooleanType("true"));
ourLog.info("** About to update subscription");
myClient.update().resource(subscription1).execute();
waitForQueueToDrain();
waitForSize(modCount + 1, ()->myCountingInterceptor.getSentCount());
Thread.sleep(4000);
ourLog.info("** About to send observation");
Observation observation1 = sendObservation(code, "SNOMED-CT");
// Should see 1 subscription notification
@ -204,8 +251,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
assertEquals(null, ourUpdatedObservations.get(0).getIdElement().getVersionIdPart());
}
@Test
@Test
public void testRestHookSubscriptionApplicationJson() throws Exception {
String payload = "application/json";
@ -215,6 +261,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
Subscription subscription1 = createSubscription(criteria1, payload, ourListenerServerBase);
Subscription subscription2 = createSubscription(criteria2, payload, ourListenerServerBase);
waitForRegisteredSubscriptionCount(2);
Observation observation1 = sendObservation(code, "SNOMED-CT");
@ -292,11 +339,12 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
Subscription subscription1 = createSubscription(criteria1, payload, ourListenerServerBase);
Subscription subscription2 = createSubscription(criteria2, payload, ourListenerServerBase);
waitForRegisteredSubscriptionCount(2);
ourLog.info("** About to send obervation");
Observation observation1 = sendObservation(code, "SNOMED-CT");
// Should see 1 subscription notification
waitForQueueToDrain();
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
assertEquals(Constants.CT_FHIR_XML_NEW, ourContentTypes.get(0));
@ -401,7 +449,6 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
assertEquals(1, ourUpdatedObservations.size());
}
@Test
public void testRestHookSubscriptionApplicationXmlJson() throws Exception {
String payload = "application/fhir+xml";
@ -412,6 +459,7 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
Subscription subscription1 = createSubscription(criteria1, payload, ourListenerServerBase);
Subscription subscription2 = createSubscription(criteria2, payload, ourListenerServerBase);
waitForRegisteredSubscriptionCount(2);
Observation observation1 = sendObservation(code, "SNOMED-CT");
@ -447,6 +495,8 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
// Add some headers, and we'll also turn back to requested status for fun
Subscription subscription = createSubscription(criteria1, payload, ourListenerServerBase);
waitForRegisteredSubscriptionCount(1);
subscription.getChannel().addHeader("X-Foo: FOO");
subscription.getChannel().addHeader("X-Bar: BAR");
subscription.setStatus(Subscription.SubscriptionStatus.REQUESTED);
@ -472,6 +522,8 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
String criteria1 = "Observation?code=SNOMED-CT|" + code + "&_format=xml";
Subscription subscription = createSubscription(criteria1, payload, ourListenerServerBase);
waitForRegisteredSubscriptionCount(1);
sendObservation(code, "SNOMED-CT");
// Should see 1 subscription notification
@ -498,33 +550,6 @@ public class RestHookTestR4Test extends BaseResourceProviderR4Test {
RestHookTestDstu2Test.waitForQueueToDrain(getRestHookSubscriptionInterceptor());
}
@BeforeClass
public static void startListenerServer() throws Exception {
ourListenerPort = PortUtil.findFreePort();
ourListenerRestServer = new RestfulServer(FhirContext.forR4());
ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context";
ObservationListener obsListener = new ObservationListener();
ourListenerRestServer.setResourceProviders(obsListener);
ourListenerServer = new Server(ourListenerPort);
ServletContextHandler proxyHandler = new ServletContextHandler();
proxyHandler.setContextPath("/");
ServletHolder servletHolder = new ServletHolder();
servletHolder.setServlet(ourListenerRestServer);
proxyHandler.addServlet(servletHolder, "/fhir/context/*");
ourListenerServer.setHandler(proxyHandler);
ourListenerServer.start();
}
@AfterClass
public static void stopListenerServer() throws Exception {
ourListenerServer.stop();
}
public static class ObservationListener implements IResourceProvider {
@Create