From d0884663d27024bdd681bdc8a3e9ec32fc4d19f0 Mon Sep 17 00:00:00 2001 From: jamesagnew Date: Sun, 5 Apr 2020 11:34:47 -0400 Subject: [PATCH] Work on subscription cleanup --- .../SubscriptionActivatingSubscriber.java | 25 ++++++------------- .../SubscriptionRegisteringSubscriber.java | 14 +++++++++-- .../registry/SubscriptionRegistry.java | 8 +++--- .../uhn/test/concurrency/PointcutLatch.java | 18 ++++++------- 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java index f442c873211..a40495af6c0 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionActivatingSubscriber.java @@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber; * #L% */ -import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; @@ -31,7 +30,6 @@ import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrateg import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry; -import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.util.SubscriptionUtil; import org.hl7.fhir.instance.model.api.IBaseResource; @@ -41,10 +39,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; import javax.annotation.Nonnull; @@ -113,12 +107,15 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) { return activateSubscription(theSubscription); - } else if (SubscriptionConstants.ACTIVE_STATUS.equals(statusString)) { - return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription); - } else { - // Status isn't "active" or "requested" - return mySubscriptionRegistry.unregisterSubscriptionIfRegistered(theSubscription, statusString); + // FIXME: remove +// } else if (SubscriptionConstants.ACTIVE_STATUS.equals(statusString)) { +// return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription); +// } else { +// Status isn't "active" or "requested" +// return mySubscriptionRegistry.unregisterSubscriptionIfRegistered(theSubscription, statusString); } + + return false; } @SuppressWarnings("unchecked") @@ -141,12 +138,6 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript } } - - private boolean isSubscription(IBaseResource theNewResource) { - RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(theNewResource); - return ResourceTypeEnum.SUBSCRIPTION.getCode().equals(resourceDefinition.getName()); - } - private void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) { activateOrRegisterSubscriptionIfRequired(theSubscription); } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionRegisteringSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionRegisteringSubscriber.java index 9c6eabdae79..cd64a0b07b2 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionRegisteringSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/matcher/subscriber/SubscriptionRegisteringSubscriber.java @@ -22,8 +22,10 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; +import org.hl7.fhir.instance.model.api.IBaseResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -45,6 +47,8 @@ public class SubscriptionRegisteringSubscriber extends BaseSubscriberForSubscrip private FhirContext myFhirContext; @Autowired private SubscriptionRegistry mySubscriptionRegistry; + @Autowired + private SubscriptionCanonicalizer mySubscriptionCanonicalizer; /** * Constructor @@ -68,11 +72,17 @@ public class SubscriptionRegisteringSubscriber extends BaseSubscriberForSubscrip switch (payload.getOperationType()) { case DELETE: - mySubscriptionRegistry.unregisterSubscription(payload.getId(myFhirContext).getIdPart()); + mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payload.getId(myFhirContext).getIdPart()); break; case CREATE: case UPDATE: - mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(payload.getNewPayload(myFhirContext)); + IBaseResource subscription = payload.getNewPayload(myFhirContext); + String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(subscription); + if ("active".equals(statusString)) { + mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(payload.getNewPayload(myFhirContext)); + } else { + mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payload.getId(myFhirContext).getIdPart()); + } break; case MANUALLY_TRIGGERED: default: diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionRegistry.java index 26edf6f9a44..0a03f8b11a3 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/process/registry/SubscriptionRegistry.java @@ -104,7 +104,7 @@ public class SubscriptionRegistry { return canonicalized; } - public void unregisterSubscription(String theSubscriptionId) { + public void unregisterSubscriptionIfRegistered(String theSubscriptionId) { Validate.notNull(theSubscriptionId); ourLog.info("Unregistering active subscription {}", theSubscriptionId); @@ -126,7 +126,7 @@ public class SubscriptionRegistry { List idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds); for (String id : idsToDelete) { - unregisterSubscription(id); + unregisterSubscriptionIfRegistered(id); } } @@ -145,7 +145,7 @@ public class SubscriptionRegistry { updateSubscription(theSubscription); return true; } - unregisterSubscription(theSubscription.getIdElement().getIdPart()); + unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart()); } if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) { registerSubscription(theSubscription.getIdElement(), theSubscription); @@ -177,7 +177,7 @@ public class SubscriptionRegistry { public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) { if (hasSubscription(theSubscription.getIdElement()).isPresent()) { ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue()); - unregisterSubscription(theSubscription.getIdElement().getIdPart()); + unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart()); return true; } return false; diff --git a/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java b/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java index 080b9238903..ed50d5f9469 100644 --- a/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java +++ b/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/PointcutLatch.java @@ -44,7 +44,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { private static final int DEFAULT_TIMEOUT_SECONDS = 10; private static final FhirObjectPrinter ourFhirObjectToStringMapper = new FhirObjectPrinter(); - private final String name; + private final String myName; private final AtomicLong myLastInvoke = new AtomicLong(); private final AtomicReference myCountdownLatch = new AtomicReference<>(); private final AtomicReference> myFailures = new AtomicReference<>(); @@ -54,13 +54,13 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { private int myInitialCount; private boolean myExactMatch; public PointcutLatch(Pointcut thePointcut) { - this.name = thePointcut.name(); + this.myName = thePointcut.name(); myPointcut = thePointcut; } public PointcutLatch(String theName) { - this.name = theName; + this.myName = theName; myPointcut = null; } @@ -85,9 +85,9 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { myExactMatch = theExactMatch; createLatch(theCount); if (theExactMatch) { - ourLog.info("Expecting exactly {} calls to {} latch", theCount, name); + ourLog.info("Expecting exactly {} calls to {} latch", theCount, myName); } else { - ourLog.info("Expecting at least {} calls to {} latch", theCount, name); + ourLog.info("Expecting at least {} calls to {} latch", theCount, myName); } } @@ -111,7 +111,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { } private String getName() { - return name + " " + this.getClass().getSimpleName(); + return myName + " " + this.getClass().getSimpleName(); } @Override @@ -175,7 +175,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { CountDownLatch latch = myCountdownLatch.get(); if (myExactMatch) { if (latch == null) { - throw new PointcutLatchException("invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived.", theArgs); + throw new PointcutLatchException("invoke() for " + myName + " called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived with args: " + theArgs, theArgs); } else if (latch.getCount() <= 0) { addFailure("invoke() called when countdown was zero."); } @@ -186,7 +186,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { if (myCalledWith.get() != null) { myCalledWith.get().add(theArgs); } - ourLog.info("Called {} {} with {}", name, latch, hookParamsToString(theArgs)); + ourLog.info("Called {} {} with {}", myName, latch, hookParamsToString(theArgs)); latch.countDown(); } @@ -198,7 +198,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch { @Override public String toString() { return new ToStringBuilder(this) - .append("name", name) + .append("name", myName) .append("myCountdownLatch", myCountdownLatch) // .append("myFailures", myFailures) // .append("myCalledWith", myCalledWith)