Merge pull request #1191 from jamesagnew/standalone-subscription-delete-fail

Standalone subscription delete fail
This commit is contained in:
James Agnew 2019-02-03 16:32:57 -05:00 committed by GitHub
commit b928af7bc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 68 additions and 35 deletions

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription; import org.hl7.fhir.r4.model.Subscription;
@ -357,4 +358,20 @@ public class CanonicalSubscription implements Serializable, Cloneable {
} }
@Override
public String toString() {
return new ToStringBuilder(this)
.append("myIdElement", myIdElement)
.append("myStatus", myStatus)
.append("myCriteriaString", myCriteriaString)
.append("myEndpointUrl", myEndpointUrl)
.append("myPayloadString", myPayloadString)
// .append("myHeaders", myHeaders)
.append("myChannelType", myChannelType)
// .append("myTrigger", myTrigger)
// .append("myEmailDetails", myEmailDetails)
// .append("myRestHookDetails", myRestHookDetails)
// .append("myChannelExtensions", myChannelExtensions)
.toString();
}
} }

View File

@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
* #L% * #L%
*/ */
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import java.util.ArrayList; import java.util.ArrayList;
@ -70,9 +69,4 @@ class ActiveSubscriptionCache {
} }
} }
} }
@VisibleForTesting
void clearForUnitTests() {
myCache.clear();
}
} }

View File

@ -24,7 +24,6 @@ import ca.uhn.fhir.jpa.model.entity.ModelConfig;
import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorBroadcaster; import ca.uhn.fhir.jpa.model.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
@ -113,7 +112,7 @@ public class SubscriptionRegistry {
} }
@PreDestroy @PreDestroy
public void preDestroy() { public void unregisterAllSubscriptions() {
unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
} }
@ -156,9 +155,4 @@ public class SubscriptionRegistry {
public int size() { public int size() {
return myActiveSubscriptionCache.size(); return myActiveSubscriptionCache.size();
} }
@VisibleForTesting
public void clearForUnitTests() {
myActiveSubscriptionCache.clearForUnitTests();
}
} }

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.subscription.module.standalone;
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -30,6 +30,7 @@ import ca.uhn.fhir.jpa.subscription.module.subscriber.ResourceModifiedJsonMessag
import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber; import ca.uhn.fhir.jpa.subscription.module.subscriber.SubscriptionMatchingSubscriber;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -61,15 +62,35 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler {
} }
public void updateSubscriptionRegistryAndPerformMatching(ResourceModifiedMessage theResourceModifiedMessage) { public void updateSubscriptionRegistryAndPerformMatching(ResourceModifiedMessage theResourceModifiedMessage) {
IBaseResource resource = theResourceModifiedMessage.getNewPayload(myFhirContext); switch (theResourceModifiedMessage.getOperationType()) {
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(resource); case DELETE:
if (isSubscription(theResourceModifiedMessage)) {
if (resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode())) { mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext));
String status = mySubscriptionCanonicalizer.getSubscriptionStatus(resource); }
if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) { return;
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource); case CREATE:
} case UPDATE:
if (isSubscription(theResourceModifiedMessage)) {
registerActiveSubscription(theResourceModifiedMessage.getNewPayload(myFhirContext));
}
break;
default:
break;
} }
mySubscriptionMatchingSubscriber.matchActiveSubscriptionsAndDeliver(theResourceModifiedMessage); mySubscriptionMatchingSubscriber.matchActiveSubscriptionsAndDeliver(theResourceModifiedMessage);
} }
private boolean isSubscription(ResourceModifiedMessage theResourceModifiedMessage) {
IIdType id = theResourceModifiedMessage.getId(myFhirContext);
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(id.getResourceType());
return resourceDef.getName().equals(ResourceTypeEnum.SUBSCRIPTION.getCode());
}
private void registerActiveSubscription(IBaseResource theSubscription) {
String status = mySubscriptionCanonicalizer.getSubscriptionStatus(theSubscription);
if (SubscriptionConstants.ACTIVE_STATUS.equals(status)) {
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription);
}
}
} }

View File

@ -73,7 +73,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
ourCreatedObservations.clear(); ourCreatedObservations.clear();
ourUpdatedObservations.clear(); ourUpdatedObservations.clear();
ourContentTypes.clear(); ourContentTypes.clear();
mySubscriptionRegistry.clearForUnitTests(); mySubscriptionRegistry.unregisterAllSubscriptions();
if (ourSubscribableChannel == null) { if (ourSubscribableChannel == null) {
ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase()); ourSubscribableChannel = mySubscriptionChannelFactory.newDeliveryChannel("test", Subscription.SubscriptionChannelType.RESTHOOK.toCode().toLowerCase());
ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler); ourSubscribableChannel.subscribe(myStandaloneSubscriptionMessageHandler);
@ -85,6 +85,9 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
@After @After
public void cleanup() { public void cleanup() {
myInterceptorRegistry.clearAnonymousHookForUnitTest(); myInterceptorRegistry.clearAnonymousHookForUnitTest();
mySubscriptionMatchingPost.clear();
mySubscriptionActivatedPost.clear();
ourObservationListener.clear();
} }
public <T extends IBaseResource> T sendResource(T theResource) throws InterruptedException { public <T extends IBaseResource> T sendResource(T theResource) throws InterruptedException {
@ -184,5 +187,7 @@ public abstract class BaseBlockingQueueSubscribableChannelDstu3Test extends Base
public void expectNothing() { public void expectNothing() {
updateLatch.expectNothing(); updateLatch.expectNothing();
} }
public void clear() { updateLatch.clear();}
} }
} }

View File

@ -32,6 +32,7 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs
ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE);
ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message);
myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage);
Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any());
Mockito.verify(mySubscriptionRegistry).registerSubscriptionUnlessAlreadyRegistered(any()); Mockito.verify(mySubscriptionRegistry).registerSubscriptionUnlessAlreadyRegistered(any());
Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any()); Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any());
} }
@ -42,7 +43,19 @@ public class StandaloneSubscriptionMessageHandlerTest extends BaseSubscriptionDs
ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.CREATE);
ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message); ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message);
myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage); myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage);
Mockito.verify(mySubscriptionRegistry, never()).unregisterSubscription(any());
Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any()); Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any());
Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any()); Mockito.verify(mySubscriptionMatchingSubscriber).matchActiveSubscriptionsAndDeliver(any());
} }
@Test
public void deleteSubscription() {
Subscription subscription = makeSubscriptionWithStatus("testCriteria", "testPayload", "testEndpoint", Subscription.SubscriptionStatus.REQUESTED);
ResourceModifiedMessage message = new ResourceModifiedMessage(myFhirContext, subscription, ResourceModifiedMessage.OperationTypeEnum.DELETE);
ResourceModifiedJsonMessage jsonMessage = new ResourceModifiedJsonMessage(message);
myStandaloneSubscriptionMessageHandler.handleMessage(jsonMessage);
Mockito.verify(mySubscriptionRegistry).unregisterSubscription(any());
Mockito.verify(mySubscriptionRegistry, never()).registerSubscriptionUnlessAlreadyRegistered(any());
Mockito.verify(mySubscriptionMatchingSubscriber, never()).matchActiveSubscriptionsAndDeliver(any());
}
} }

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.subscription.module.standalone; package ca.uhn.fhir.jpa.subscription.module.standalone;
import ca.uhn.fhir.jpa.model.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.IBundleProvider; import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.server.SimpleBundleProvider; import ca.uhn.fhir.rest.server.SimpleBundleProvider;
@ -9,17 +8,12 @@ import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test { public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscribableChannelDstu3Test {
@Test @Test
public void testSubscriptionLoaderFhirClient() throws InterruptedException { public void testSubscriptionLoaderFhirClient() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown());
String payload = "application/fhir+json"; String payload = "application/fhir+json";
String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml"; String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
@ -33,7 +27,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba
initSubscriptionLoader(bundle); initSubscriptionLoader(bundle);
sendObservation(myCode, "SNOMED-CT"); sendObservation(myCode, "SNOMED-CT");
latch.await(10, TimeUnit.SECONDS);
waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedObservations);
waitForSize(1, ourUpdatedObservations); waitForSize(1, ourUpdatedObservations);
@ -42,9 +35,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba
@Test @Test
public void testSubscriptionLoaderFhirClientSubscriptionNotActive() throws InterruptedException { public void testSubscriptionLoaderFhirClientSubscriptionNotActive() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
myInterceptorRegistry.registerAnonymousHookForUnitTest(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, t-> latch.countDown());
String payload = "application/fhir+json"; String payload = "application/fhir+json";
String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml"; String criteria1 = "Observation?code=SNOMED-CT|" + myCode + "&_format=xml";
@ -58,7 +48,6 @@ public class SubscriptionLoaderFhirClientTest extends BaseBlockingQueueSubscriba
initSubscriptionLoader(bundle); initSubscriptionLoader(bundle);
sendObservation(myCode, "SNOMED-CT"); sendObservation(myCode, "SNOMED-CT");
latch.await(10, TimeUnit.SECONDS);
waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedObservations);
waitForSize(0, ourUpdatedObservations); waitForSize(0, ourUpdatedObservations);