ready for review

This commit is contained in:
Ken Stevens 2019-02-02 18:03:09 -05:00
parent dd0b56142d
commit 4106d67eb7
4 changed files with 41 additions and 21 deletions

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.module.standalone;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -30,6 +30,7 @@ import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessag
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -61,15 +62,37 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler {
}
public void updateSubscriptionRegistryAndPerformMatching(ResourceModifiedMessage theResourceModifiedMessage) {
IBaseResource resource = theResourceModifiedMessage.getNewPayload(myFhirContext);
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(resource);
if (resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode())) {
String status = mySubscriptionCanonicalizer.getSubscriptionStatus(resource);
if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) {
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
}
if (isSubscription(theResourceModifiedMessage)) {
handleSubscriptionActivation(theResourceModifiedMessage);
}
mySubscriptionMatchingSubscriber.matchActiveSubscriptionsAndDeliver(theResourceModifiedMessage);
}
private void handleSubscriptionActivation(ResourceModifiedMessage theResourceModifiedMessage) {
switch (theResourceModifiedMessage.getOperationType()) {
case DELETE:
mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext));
break;
case CREATE:
case UPDATE:
registerActiveSubscription(theResourceModifiedMessage.getNewPayload(myFhirContext));
break;
default:
break;
}
}
private boolean isSubscription(ResourceModifiedMessage theResourceModifiedMessage) {
IIdType id = theResourceModifiedMessage.getId(myFhirContext);
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(id.getResourceType());
return resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode());
}
private void registerActiveSubscription(IBaseResource theSubscription) {
String status = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription);
if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) {
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription);
}
}
}

View File

@ -85,6 +85,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
@After
public void cleanup() {
myInterceptorRegistry.clearAnonymousHookForUnitTest();
mySubscriptionMatchingPost.clear();
mySubscriptionActivatedPost.clear();
ourObservationListener.clear();
}
public <T extends IBaseResource> T sendResource(T theResource) throws InterruptedException {
@ -184,5 +187,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
public void expectNothing() {
updateLatch.expectNothing();
}
public void clear() { updateLatch.clear();}
}
}

View File

@ -32,6 +32,7 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs
ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE);
ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message);
myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage);
Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any());
Mockito.verify(mySubscriptionRegistry).registerSubscriptionUnlessAlreadyRegistered(any());
Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any());
}
@ -42,6 +43,7 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs
ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE);
ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message);
myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage);
Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any());
Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any());
Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any());
}
@ -52,6 +54,7 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs
ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.DELETE);
ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message);
myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage);
Mockito.verify(mySubscriptionRegistry).unregisterSubscription(any());
Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any());
Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any());
}

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.subscription.module.standalone;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
@ -9,17 +8,12 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
@Test
public void testSubscriptionLoaderFhirClient() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown());
String payload = "application/fhir+json";
String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
@ -33,7 +27,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba
initSubscriptionLoader(bundle);
sendObservation(myCode, "SNOMED-CT");
latch.await(10, TimeUnit.SECONDS);
waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations);
@ -42,9 +35,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba
@Test
public void testSubscriptionLoaderFhirClientSubscriptionNotActive() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown());
String payload = "application/fhir+json";
String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
@ -58,7 +48,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba
initSubscriptionLoader(bundle);
sendObservation(myCode, "SNOMED-CT");
latch.await(10, TimeUnit.SECONDS);
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourUpdatedObservations);