diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java index b65bf6f5da3..e19bd4cb94a 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCache.java @@ -20,16 +20,11 @@ package ca.uhn.fhir.jpa.subscription.module.cache; * #L% */ -import com.google.common.collect.Multimap; -import com.google.common.collect.MultimapBuilder; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; class ActiveSubscriptionCache { @@ -65,7 +60,8 @@ class ActiveSubscriptionCache { return activeSubscription; } - public void unregisterAllSubscriptionsNotInCollection(Collection theAllIds) { + List markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(Collection theAllIds) { + List retval = new ArrayList<>(); for (String next : new ArrayList<>(myCache.keySet())) { ActiveSubscription activeSubscription = myCache.get(next); if (theAllIds.contains(next)) { @@ -74,11 +70,12 @@ class ActiveSubscriptionCache { } else { if (activeSubscription.isFlagForDeletion()) { ourLog.info("Unregistering Subscription/{}", next); - remove(next); + retval.add(next); } else { activeSubscription.setFlagForDeletion(true); } } } + return retval; } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java index 6121045b059..64a67011eb3 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/cache/SubscriptionRegistry.java @@ -36,6 +36,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Optional; /** @@ -104,12 +105,11 @@ public class SubscriptionRegistry { return canonicalized; } - public void unregisterSubscription(IIdType theId) { - Validate.notNull(theId); - String subscriptionId = theId.getIdPart(); + public void unregisterSubscription(String theSubscriptionId) { + Validate.notNull(theSubscriptionId); - ourLog.info("Unregistering active subscription {}", theId.toUnqualified().getValue()); - ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(subscriptionId); + ourLog.info("Unregistering active subscription {}", theSubscriptionId); + ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId); if (activeSubscription != null) { mySubscriptionChannelRegistry.remove(activeSubscription); } @@ -124,7 +124,11 @@ public class SubscriptionRegistry { } void unregisterAllSubscriptionsNotInCollection(Collection theAllIds) { - myActiveSubscriptionCache.unregisterAllSubscriptionsNotInCollection(theAllIds); + + List idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds); + for (String id : idsToDelete) { + unregisterSubscription(id); + } } public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { @@ -142,7 +146,7 @@ public class SubscriptionRegistry { updateSubscription(theSubscription); return true; } - unregisterSubscription(theSubscription.getIdElement()); + unregisterSubscription(theSubscription.getIdElement().getIdPart()); } if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) { registerSubscription(theSubscription.getIdElement(), theSubscription); @@ -174,7 +178,7 @@ public class SubscriptionRegistry { public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) { if (hasSubscription(theSubscription.getIdElement()).isPresent()) { ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue()); - unregisterSubscription(theSubscription.getIdElement()); + unregisterSubscription(theSubscription.getIdElement().getIdPart()); return true; } return false; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java index 88788d77db5..f01b41895b8 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/channel/SubscriptionChannelRegistry.java @@ -60,4 +60,8 @@ public class SubscriptionChannelRegistry { public SubscriptionChannelWithHandlers get(String theChannelName) { return mySubscriptionChannelCache.get(theChannelName); } + + public int size() { + return mySubscriptionChannelCache.size(); + } } diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java index bd5605beb4a..1d3d05ab409 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/module/standalone/StandaloneSubscriptionMessageHandler.java @@ -65,7 +65,7 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler { switch (theResourceModifiedMessage.getOperationType()) { case DELETE: if (isSubscription(theResourceModifiedMessage)) { - mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext)); + mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext).getIdPart()); } return; case CREATE: diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCacheTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCacheTest.java index 9cccebca632..f9fc7af1a40 100644 --- a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCacheTest.java +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/module/cache/ActiveSubscriptionCacheTest.java @@ -5,6 +5,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.*; public class ActiveSubscriptionCacheTest { @@ -17,12 +18,13 @@ public class ActiveSubscriptionCacheTest { assertFalse(activeSub1.isFlagForDeletion()); List saveIds = new ArrayList<>(); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + List idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); assertTrue(activeSub1.isFlagForDeletion()); assertNotNull(activeSubscriptionCache.get(id1)); + assertEquals(0, idsToDelete.size()); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); - assertNull(activeSubscriptionCache.get(id1)); + idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); + assertThat(idsToDelete, containsInAnyOrder(id1)); } @Test @@ -35,14 +37,15 @@ public class ActiveSubscriptionCacheTest { assertFalse(activeSub1.isFlagForDeletion()); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + List idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); assertTrue(activeSub1.isFlagForDeletion()); assertNotNull(activeSubscriptionCache.get(id1)); + assertEquals(0, idsToDelete.size()); saveIds.add(id1); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); assertFalse(activeSub1.isFlagForDeletion()); - assertNotNull(activeSubscriptionCache.get(id1)); + assertEquals(0, idsToDelete.size()); } @Test @@ -58,9 +61,9 @@ public class ActiveSubscriptionCacheTest { activeSub1.setFlagForDeletion(true); List saveIds = new ArrayList<>(); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + List idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); - assertNull(activeSubscriptionCache.get(id1)); + assertThat(idsToDelete, containsInAnyOrder(id1)); assertNotNull(activeSubscriptionCache.get(id2)); assertTrue(activeSub2.isFlagForDeletion()); } @@ -80,7 +83,7 @@ public class ActiveSubscriptionCacheTest { saveIds.add(id1); saveIds.add(id2); - activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds); + activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds); assertNotNull(activeSubscriptionCache.get(id1)); assertFalse(activeSub1.isFlagForDeletion());