fix synchronization channel removal
This commit is contained in:
parent
a9f83a8c43
commit
6196f528d9
|
@ -20,16 +20,11 @@ package ca.uhn.fhir.jpa.subscription.module.cache;
|
||||||
* #L%
|
* #L%
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import com.google.common.collect.Multimap;
|
|
||||||
import com.google.common.collect.MultimapBuilder;
|
|
||||||
import org.apache.commons.lang3.Validate;
|
import org.apache.commons.lang3.Validate;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
class ActiveSubscriptionCache {
|
class ActiveSubscriptionCache {
|
||||||
|
@ -65,7 +60,8 @@ class ActiveSubscriptionCache {
|
||||||
return activeSubscription;
|
return activeSubscription;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
|
List<String> markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(Collection<String> theAllIds) {
|
||||||
|
List<String> retval = new ArrayList<>();
|
||||||
for (String next : new ArrayList<>(myCache.keySet())) {
|
for (String next : new ArrayList<>(myCache.keySet())) {
|
||||||
ActiveSubscription activeSubscription = myCache.get(next);
|
ActiveSubscription activeSubscription = myCache.get(next);
|
||||||
if (theAllIds.contains(next)) {
|
if (theAllIds.contains(next)) {
|
||||||
|
@ -74,11 +70,12 @@ class ActiveSubscriptionCache {
|
||||||
} else {
|
} else {
|
||||||
if (activeSubscription.isFlagForDeletion()) {
|
if (activeSubscription.isFlagForDeletion()) {
|
||||||
ourLog.info("Unregistering Subscription/{}", next);
|
ourLog.info("Unregistering Subscription/{}", next);
|
||||||
remove(next);
|
retval.add(next);
|
||||||
} else {
|
} else {
|
||||||
activeSubscription.setFlagForDeletion(true);
|
activeSubscription.setFlagForDeletion(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return retval;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.springframework.stereotype.Component;
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -104,12 +105,11 @@ public class SubscriptionRegistry {
|
||||||
return canonicalized;
|
return canonicalized;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unregisterSubscription(IIdType theId) {
|
public void unregisterSubscription(String theSubscriptionId) {
|
||||||
Validate.notNull(theId);
|
Validate.notNull(theSubscriptionId);
|
||||||
String subscriptionId = theId.getIdPart();
|
|
||||||
|
|
||||||
ourLog.info("Unregistering active subscription {}", theId.toUnqualified().getValue());
|
ourLog.info("Unregistering active subscription {}", theSubscriptionId);
|
||||||
ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(subscriptionId);
|
ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId);
|
||||||
if (activeSubscription != null) {
|
if (activeSubscription != null) {
|
||||||
mySubscriptionChannelRegistry.remove(activeSubscription);
|
mySubscriptionChannelRegistry.remove(activeSubscription);
|
||||||
}
|
}
|
||||||
|
@ -124,7 +124,11 @@ public class SubscriptionRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
|
void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
|
||||||
myActiveSubscriptionCache.unregisterAllSubscriptionsNotInCollection(theAllIds);
|
|
||||||
|
List<String> idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds);
|
||||||
|
for (String id : idsToDelete) {
|
||||||
|
unregisterSubscription(id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
|
public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
|
||||||
|
@ -142,7 +146,7 @@ public class SubscriptionRegistry {
|
||||||
updateSubscription(theSubscription);
|
updateSubscription(theSubscription);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
unregisterSubscription(theSubscription.getIdElement());
|
unregisterSubscription(theSubscription.getIdElement().getIdPart());
|
||||||
}
|
}
|
||||||
if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
|
if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
|
||||||
registerSubscription(theSubscription.getIdElement(), theSubscription);
|
registerSubscription(theSubscription.getIdElement(), theSubscription);
|
||||||
|
@ -174,7 +178,7 @@ public class SubscriptionRegistry {
|
||||||
public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
|
public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
|
||||||
if (hasSubscription(theSubscription.getIdElement()).isPresent()) {
|
if (hasSubscription(theSubscription.getIdElement()).isPresent()) {
|
||||||
ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue());
|
ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue());
|
||||||
unregisterSubscription(theSubscription.getIdElement());
|
unregisterSubscription(theSubscription.getIdElement().getIdPart());
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -60,4 +60,8 @@ public class SubscriptionChannelRegistry {
|
||||||
public SubscriptionChannelWithHandlers get(String theChannelName) {
|
public SubscriptionChannelWithHandlers get(String theChannelName) {
|
||||||
return mySubscriptionChannelCache.get(theChannelName);
|
return mySubscriptionChannelCache.get(theChannelName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int size() {
|
||||||
|
return mySubscriptionChannelCache.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class StandaloneSubscriptionMessageHandler implements MessageHandler {
|
||||||
switch (theResourceModifiedMessage.getOperationType()) {
|
switch (theResourceModifiedMessage.getOperationType()) {
|
||||||
case DELETE:
|
case DELETE:
|
||||||
if (isSubscription(theResourceModifiedMessage)) {
|
if (isSubscription(theResourceModifiedMessage)) {
|
||||||
mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext));
|
mySubscriptionRegistry.unregisterSubscription(theResourceModifiedMessage.getId(myFhirContext).getIdPart());
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
case CREATE:
|
case CREATE:
|
||||||
|
|
|
@ -5,6 +5,7 @@ import org.junit.Test;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class ActiveSubscriptionCacheTest {
|
public class ActiveSubscriptionCacheTest {
|
||||||
|
@ -17,12 +18,13 @@ public class ActiveSubscriptionCacheTest {
|
||||||
assertFalse(activeSub1.isFlagForDeletion());
|
assertFalse(activeSub1.isFlagForDeletion());
|
||||||
List<String> saveIds = new ArrayList<>();
|
List<String> saveIds = new ArrayList<>();
|
||||||
|
|
||||||
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds);
|
List<String> idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
|
||||||
assertTrue(activeSub1.isFlagForDeletion());
|
assertTrue(activeSub1.isFlagForDeletion());
|
||||||
assertNotNull(activeSubscriptionCache.get(id1));
|
assertNotNull(activeSubscriptionCache.get(id1));
|
||||||
|
assertEquals(0, idsToDelete.size());
|
||||||
|
|
||||||
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds);
|
idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
|
||||||
assertNull(activeSubscriptionCache.get(id1));
|
assertThat(idsToDelete, containsInAnyOrder(id1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -35,14 +37,15 @@ public class ActiveSubscriptionCacheTest {
|
||||||
|
|
||||||
assertFalse(activeSub1.isFlagForDeletion());
|
assertFalse(activeSub1.isFlagForDeletion());
|
||||||
|
|
||||||
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds);
|
List<String> idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
|
||||||
assertTrue(activeSub1.isFlagForDeletion());
|
assertTrue(activeSub1.isFlagForDeletion());
|
||||||
assertNotNull(activeSubscriptionCache.get(id1));
|
assertNotNull(activeSubscriptionCache.get(id1));
|
||||||
|
assertEquals(0, idsToDelete.size());
|
||||||
|
|
||||||
saveIds.add(id1);
|
saveIds.add(id1);
|
||||||
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds);
|
idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
|
||||||
assertFalse(activeSub1.isFlagForDeletion());
|
assertFalse(activeSub1.isFlagForDeletion());
|
||||||
assertNotNull(activeSubscriptionCache.get(id1));
|
assertEquals(0, idsToDelete.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -58,9 +61,9 @@ public class ActiveSubscriptionCacheTest {
|
||||||
activeSub1.setFlagForDeletion(true);
|
activeSub1.setFlagForDeletion(true);
|
||||||
List<String> saveIds = new ArrayList<>();
|
List<String> saveIds = new ArrayList<>();
|
||||||
|
|
||||||
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds);
|
List<String> idsToDelete = activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
|
||||||
|
|
||||||
assertNull(activeSubscriptionCache.get(id1));
|
assertThat(idsToDelete, containsInAnyOrder(id1));
|
||||||
assertNotNull(activeSubscriptionCache.get(id2));
|
assertNotNull(activeSubscriptionCache.get(id2));
|
||||||
assertTrue(activeSub2.isFlagForDeletion());
|
assertTrue(activeSub2.isFlagForDeletion());
|
||||||
}
|
}
|
||||||
|
@ -80,7 +83,7 @@ public class ActiveSubscriptionCacheTest {
|
||||||
saveIds.add(id1);
|
saveIds.add(id1);
|
||||||
saveIds.add(id2);
|
saveIds.add(id2);
|
||||||
|
|
||||||
activeSubscriptionCache.unregisterAllSubscriptionsNotInCollection(saveIds);
|
activeSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(saveIds);
|
||||||
|
|
||||||
assertNotNull(activeSubscriptionCache.get(id1));
|
assertNotNull(activeSubscriptionCache.get(id1));
|
||||||
assertFalse(activeSub1.isFlagForDeletion());
|
assertFalse(activeSub1.isFlagForDeletion());
|
||||||
|
|
Loading…
Reference in New Issue