Work on subscription cleanup

This commit is contained in:
jamesagnew 2020-04-05 11:34:47 -04:00
parent 0a28c0c060
commit d0884663d2
4 changed files with 33 additions and 32 deletions

View File

@ -20,7 +20,6 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber;
* #L% * #L%
*/ */
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.api.config.DaoConfig; import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry; import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
@ -31,7 +30,6 @@ import ca.uhn.fhir.jpa.subscription.process.matcher.matching.SubscriptionStrateg
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionConstants;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.model.dstu2.valueset.ResourceTypeEnum;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.SubscriptionUtil; import ca.uhn.fhir.util.SubscriptionUtil;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
@ -41,10 +39,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
@ -113,12 +107,15 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) { if (SubscriptionConstants.REQUESTED_STATUS.equals(statusString)) {
return activateSubscription(theSubscription); return activateSubscription(theSubscription);
} else if (SubscriptionConstants.ACTIVE_STATUS.equals(statusString)) { // FIXME: remove
return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription); // } else if (SubscriptionConstants.ACTIVE_STATUS.equals(statusString)) {
} else { // return mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(theSubscription);
// Status isn't "active" or "requested" // } else {
return mySubscriptionRegistry.unregisterSubscriptionIfRegistered(theSubscription, statusString); // Status isn't "active" or "requested"
// return mySubscriptionRegistry.unregisterSubscriptionIfRegistered(theSubscription, statusString);
} }
return false;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -141,12 +138,6 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
} }
} }
private boolean isSubscription(IBaseResource theNewResource) {
RuntimeResourceDefinition resourceDefinition = myFhirContext.getResourceDefinition(theNewResource);
return ResourceTypeEnum.SUBSCRIPTION.getCode().equals(resourceDefinition.getName());
}
private void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) { private void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) {
activateOrRegisterSubscriptionIfRequired(theSubscription); activateOrRegisterSubscriptionIfRequired(theSubscription);
} }

View File

@ -22,8 +22,10 @@ package ca.uhn.fhir.jpa.subscription.process.matcher.subscriber;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionCanonicalizer;
import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.process.registry.SubscriptionRegistry;
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -45,6 +47,8 @@ public class SubscriptionRegisteringSubscriber extends BaseSubscriberForSubscrip
private FhirContext myFhirContext; private FhirContext myFhirContext;
@Autowired @Autowired
private SubscriptionRegistry mySubscriptionRegistry; private SubscriptionRegistry mySubscriptionRegistry;
@Autowired
private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
/** /**
* Constructor * Constructor
@ -68,11 +72,17 @@ public class SubscriptionRegisteringSubscriber extends BaseSubscriberForSubscrip
switch (payload.getOperationType()) { switch (payload.getOperationType()) {
case DELETE: case DELETE:
mySubscriptionRegistry.unregisterSubscription(payload.getId(myFhirContext).getIdPart()); mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payload.getId(myFhirContext).getIdPart());
break; break;
case CREATE: case CREATE:
case UPDATE: case UPDATE:
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(payload.getNewPayload(myFhirContext)); IBaseResource subscription = payload.getNewPayload(myFhirContext);
String statusString = mySubscriptionCanonicalizer.getSubscriptionStatus(subscription);
if ("active".equals(statusString)) {
mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(payload.getNewPayload(myFhirContext));
} else {
mySubscriptionRegistry.unregisterSubscriptionIfRegistered(payload.getId(myFhirContext).getIdPart());
}
break; break;
case MANUALLY_TRIGGERED: case MANUALLY_TRIGGERED:
default: default:

View File

@ -104,7 +104,7 @@ public class SubscriptionRegistry {
return canonicalized; return canonicalized;
} }
public void unregisterSubscription(String theSubscriptionId) { public void unregisterSubscriptionIfRegistered(String theSubscriptionId) {
Validate.notNull(theSubscriptionId); Validate.notNull(theSubscriptionId);
ourLog.info("Unregistering active subscription {}", theSubscriptionId); ourLog.info("Unregistering active subscription {}", theSubscriptionId);
@ -126,7 +126,7 @@ public class SubscriptionRegistry {
List<String> idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds); List<String> idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds);
for (String id : idsToDelete) { for (String id : idsToDelete) {
unregisterSubscription(id); unregisterSubscriptionIfRegistered(id);
} }
} }
@ -145,7 +145,7 @@ public class SubscriptionRegistry {
updateSubscription(theSubscription); updateSubscription(theSubscription);
return true; return true;
} }
unregisterSubscription(theSubscription.getIdElement().getIdPart()); unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart());
} }
if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) { if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
registerSubscription(theSubscription.getIdElement(), theSubscription); registerSubscription(theSubscription.getIdElement(), theSubscription);
@ -177,7 +177,7 @@ public class SubscriptionRegistry {
public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) { public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
if (hasSubscription(theSubscription.getIdElement()).isPresent()) { if (hasSubscription(theSubscription.getIdElement()).isPresent()) {
ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue()); ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue());
unregisterSubscription(theSubscription.getIdElement().getIdPart()); unregisterSubscriptionIfRegistered(theSubscription.getIdElement().getIdPart());
return true; return true;
} }
return false; return false;

View File

@ -44,7 +44,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
private static final int DEFAULT_TIMEOUT_SECONDS = 10; private static final int DEFAULT_TIMEOUT_SECONDS = 10;
private static final FhirObjectPrinter ourFhirObjectToStringMapper = new FhirObjectPrinter(); private static final FhirObjectPrinter ourFhirObjectToStringMapper = new FhirObjectPrinter();
private final String name; private final String myName;
private final AtomicLong myLastInvoke = new AtomicLong(); private final AtomicLong myLastInvoke = new AtomicLong();
private final AtomicReference<CountDownLatch> myCountdownLatch = new AtomicReference<>(); private final AtomicReference<CountDownLatch> myCountdownLatch = new AtomicReference<>();
private final AtomicReference<List<String>> myFailures = new AtomicReference<>(); private final AtomicReference<List<String>> myFailures = new AtomicReference<>();
@ -54,13 +54,13 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
private int myInitialCount; private int myInitialCount;
private boolean myExactMatch; private boolean myExactMatch;
public PointcutLatch(Pointcut thePointcut) { public PointcutLatch(Pointcut thePointcut) {
this.name = thePointcut.name(); this.myName = thePointcut.name();
myPointcut = thePointcut; myPointcut = thePointcut;
} }
public PointcutLatch(String theName) { public PointcutLatch(String theName) {
this.name = theName; this.myName = theName;
myPointcut = null; myPointcut = null;
} }
@ -85,9 +85,9 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
myExactMatch = theExactMatch; myExactMatch = theExactMatch;
createLatch(theCount); createLatch(theCount);
if (theExactMatch) { if (theExactMatch) {
ourLog.info("Expecting exactly {} calls to {} latch", theCount, name); ourLog.info("Expecting exactly {} calls to {} latch", theCount, myName);
} else { } else {
ourLog.info("Expecting at least {} calls to {} latch", theCount, name); ourLog.info("Expecting at least {} calls to {} latch", theCount, myName);
} }
} }
@ -111,7 +111,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
} }
private String getName() { private String getName() {
return name + " " + this.getClass().getSimpleName(); return myName + " " + this.getClass().getSimpleName();
} }
@Override @Override
@ -175,7 +175,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
CountDownLatch latch = myCountdownLatch.get(); CountDownLatch latch = myCountdownLatch.get();
if (myExactMatch) { if (myExactMatch) {
if (latch == null) { if (latch == null) {
throw new PointcutLatchException("invoke() called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived.", theArgs); throw new PointcutLatchException("invoke() for " + myName + " called outside of setExpectedCount() .. awaitExpected(). Probably got more invocations than expected or clear() was called before invoke() arrived with args: " + theArgs, theArgs);
} else if (latch.getCount() <= 0) { } else if (latch.getCount() <= 0) {
addFailure("invoke() called when countdown was zero."); addFailure("invoke() called when countdown was zero.");
} }
@ -186,7 +186,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
if (myCalledWith.get() != null) { if (myCalledWith.get() != null) {
myCalledWith.get().add(theArgs); myCalledWith.get().add(theArgs);
} }
ourLog.info("Called {} {} with {}", name, latch, hookParamsToString(theArgs)); ourLog.info("Called {} {} with {}", myName, latch, hookParamsToString(theArgs));
latch.countDown(); latch.countDown();
} }
@ -198,7 +198,7 @@ public class PointcutLatch implements IAnonymousInterceptor, IPointcutLatch {
@Override @Override
public String toString() { public String toString() {
return new ToStringBuilder(this) return new ToStringBuilder(this)
.append("name", name) .append("name", myName)
.append("myCountdownLatch", myCountdownLatch) .append("myCountdownLatch", myCountdownLatch)
// .append("myFailures", myFailures) // .append("myFailures", myFailures)
// .append("myCalledWith", myCalledWith) // .append("myCalledWith", myCalledWith)