fixed triggering subscriptions

This commit is contained in:
Ken Stevens 2019-10-01 14:57:25 -04:00
parent 718b2c793d
commit f0b2bb8309
6 changed files with 24 additions and 12 deletions

View File

@ -142,7 +142,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList())); jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList()));
jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList())); jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList()));
if (theSubscriptionId != null) { if (theSubscriptionId != null) {
jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue()); jobDetails.setSubscriptionId(theSubscriptionId.getIdPart());
} }
// Submit job for processing // Submit job for processing
@ -314,7 +314,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId); ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE); ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue()); msg.setSubscriptionId(theSubscriptionId);
return myExecutorService.submit(() -> { return myExecutorService.submit(() -> {
for (int i = 0; ; i++) { for (int i = 0; ; i++) {

View File

@ -399,7 +399,7 @@ public class InMemorySubscriptionMatcherR4Test {
subscription.setCriteriaString(criteria); subscription.setCriteriaString(criteria);
subscription.setIdElement(new IdType("Subscription", 123L)); subscription.setIdElement(new IdType("Subscription", 123L));
ResourceModifiedMessage msg = new ResourceModifiedMessage(myContext, patient, ResourceModifiedMessage.OperationTypeEnum.CREATE); ResourceModifiedMessage msg = new ResourceModifiedMessage(myContext, patient, ResourceModifiedMessage.OperationTypeEnum.CREATE);
msg.setSubscriptionId("Subscription/123"); msg.setSubscriptionId("123");
msg.setId(new IdType("Patient/ABC")); msg.setId(new IdType("Patient/ABC"));
InMemoryMatchResult result = myInMemorySubscriptionMatcher.match(subscription, msg); InMemoryMatchResult result = myInMemorySubscriptionMatcher.match(subscription, msg);
fail(); fail();

View File

@ -1,9 +1,12 @@
package ca.uhn.fhir.jpa.subscription.module.channel; package ca.uhn.fhir.jpa.subscription.module.channel;
import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder; import com.google.common.collect.MultimapBuilder;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -11,6 +14,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
class SubscriptionChannelCache { class SubscriptionChannelCache {
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
private final Map<String, SubscriptionChannelWithHandlers> myCache = new ConcurrentHashMap<>(); private final Map<String, SubscriptionChannelWithHandlers> myCache = new ConcurrentHashMap<>();
public SubscriptionChannelWithHandlers get(String theChannelName) { public SubscriptionChannelWithHandlers get(String theChannelName) {
@ -44,4 +49,10 @@ class SubscriptionChannelCache {
public boolean containsKey(String theChannelName) { public boolean containsKey(String theChannelName) {
return myCache.containsKey(theChannelName); return myCache.containsKey(theChannelName);
} }
void logForUnitTest() {
for (String key : myCache.keySet()) {
ourLog.info("SubscriptionChannelCache: {}", key);
}
}
} }

View File

@ -60,7 +60,7 @@ public class SubscriptionChannelRegistry {
return; return;
} }
String channelName = theActiveSubscription.getChannelName(); String channelName = theActiveSubscription.getChannelName();
ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName); ourLog.info("Removing subscription {} from channel {}: {}", theActiveSubscription.getId() ,channelName, myActiveSubscriptionByChannelName);
boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
if (!removed) { if (!removed) {
ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId() ,channelName); ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
@ -87,10 +87,11 @@ public class SubscriptionChannelRegistry {
@VisibleForTesting @VisibleForTesting
public void logForUnitTest() { public void logForUnitTest() {
ourLog.info("{} Channels: {}", this, size()); ourLog.info("{} Channels: {}", this, size());
mySubscriptionChannelCache.logForUnitTest();
for (String key : myActiveSubscriptionByChannelName.keySet()) { for (String key : myActiveSubscriptionByChannelName.keySet()) {
Collection<String> list = myActiveSubscriptionByChannelName.get(key); Collection<String> list = myActiveSubscriptionByChannelName.get(key);
for (String value : list) { for (String value : list) {
ourLog.info("{}: {}", key, value); ourLog.info("ActiveSubscriptionByChannelName {}: {}", key, value);
} }
} }
} }

View File

@ -46,18 +46,13 @@ public class SubscriptionChannelWithHandlers implements Closeable {
removeHandler(messageHandler); removeHandler(messageHandler);
} }
if (mySubscribableChannel instanceof DisposableBean) { if (mySubscribableChannel instanceof DisposableBean) {
int subscriberCount = mySubscribableChannel.getSubscriberCount(); tryDestroyChannel((DisposableBean) mySubscribableChannel);
if (subscriberCount > 0) {
ourLog.info("Channel {} still has {} subscribers. Not destroying.", myChannelName, subscriberCount);
} else {
ourLog.info("Channel for subscription {} has no subscribers. Destroying channel.", myChannelName);
tryDestroyChannel((DisposableBean) mySubscribableChannel);
}
} }
} }
private void tryDestroyChannel(DisposableBean theSubscribableChannel) { private void tryDestroyChannel(DisposableBean theSubscribableChannel) {
try { try {
ourLog.info("Destroying channel {}", myChannelName);
theSubscribableChannel.destroy(); theSubscribableChannel.destroy();
} catch (Exception e) { } catch (Exception e) {
ourLog.error("Failed to destroy channel bean", e); ourLog.error("Failed to destroy channel bean", e);
@ -67,4 +62,8 @@ public class SubscriptionChannelWithHandlers implements Closeable {
public MessageChannel getChannel() { public MessageChannel getChannel() {
return mySubscribableChannel; return mySubscribableChannel;
} }
public String getChannelName() {
return myChannelName;
}
} }

View File

@ -122,6 +122,7 @@ public class SubscriptionMatchingSubscriber implements MessageHandler {
if (isNotBlank(theMsg.getSubscriptionId())) { if (isNotBlank(theMsg.getSubscriptionId())) {
if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) { if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
// TODO KHS we should use a hash to look it up instead of this full table scan
ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId()); ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
continue; continue;
} }