diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/2521-failing-subscriptions-wont-hang-startup.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/2521-failing-subscriptions-wont-hang-startup.yaml new file mode 100644 index 00000000000..b209aa934bd --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/5_7_0/2521-failing-subscriptions-wont-hang-startup.yaml @@ -0,0 +1,5 @@ +--- +type: fix +title: "When a subscription fails to activate (either on the first attempt or on start up), + hapi-fhir will log the exception, set the subscription to ERROR state, and continue on. + This is to prevent hanging on startup for errors that cannot be resolved by infinite retries." diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java index af52d72b41b..dd3806ffa1c 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriber.java @@ -31,9 +31,11 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage; import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; +import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.util.SubscriptionUtil; import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.r4.model.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -100,7 +102,8 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript CanonicalSubscriptionChannelType subscriptionChannelType = mySubscriptionCanonicalizer.getChannelType(theSubscription); // Only activate supported subscriptions - if (subscriptionChannelType == null || !myDaoConfig.getSupportedSubscriptionTypes().contains(subscriptionChannelType.toCanonical())) { + if (subscriptionChannelType == null + || !myDaoConfig.getSupportedSubscriptionTypes().contains(subscriptionChannelType.toCanonical())) { return false; } @@ -118,17 +121,24 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao(); SystemRequestDetails srd = SystemRequestDetails.forAllPartition(); - IBaseResource subscription = subscriptionDao.read(theSubscription.getIdElement(), SystemRequestDetails.forAllPartition()); - subscription.setId(subscription.getIdElement().toVersionless()); - - ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS); + IBaseResource subscription = null; try { + // read can throw ResourceGoneException + // if this happens, we will treat this as a failure to activate + subscription = subscriptionDao.read(theSubscription.getIdElement(), SystemRequestDetails.forAllPartition()); + subscription.setId(subscription.getIdElement().toVersionless()); + + ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS); SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS); subscriptionDao.update(subscription, srd); return true; - } catch (final UnprocessableEntityException e) { + } catch (final UnprocessableEntityException | ResourceGoneException e) { + subscription = subscription != null ? subscription : theSubscription; + ourLog.error("Failed to activate subscription " + + subscription.getIdElement() + + " : " + e.getMessage()); ourLog.info("Changing status of {} to ERROR", subscription.getIdElement()); - SubscriptionUtil.setStatus(myFhirContext, subscription, "error"); + SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ERROR_STATUS); SubscriptionUtil.setReason(myFhirContext, subscription, e.getMessage()); subscriptionDao.update(subscription, srd); return false; diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java index cd988a34939..cffb5cdfa66 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoader.java @@ -58,7 +58,7 @@ import java.util.stream.Collectors; public class SubscriptionLoader implements IResourceChangeListener { private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class); private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes - private static long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; + private static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE; private final Object mySyncSubscriptionsLock = new Object(); @Autowired @@ -134,6 +134,8 @@ public class SubscriptionLoader implements IResourceChangeListener { } synchronized int doSyncSubscriptionsWithRetry() { + // retry runs MAX_RETRIES times + // and if errors result every time, it will fail Retrier syncSubscriptionRetrier = new Retrier<>(this::doSyncSubscriptions, MAX_RETRIES); return syncSubscriptionRetrier.runWithRetry(); } @@ -186,10 +188,15 @@ public class SubscriptionLoader implements IResourceChangeListener { String nextId = resource.getIdElement().getIdPart(); allIds.add(nextId); + // internally, subscriptions that cannot activate + // will be set to error boolean activated = mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(resource); if (activated) { activatedCount++; } + else { + logSubscriptionNotActivatedPlusErrorIfPossible(resource); + } boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource); if (registered) { @@ -202,6 +209,32 @@ public class SubscriptionLoader implements IResourceChangeListener { return activatedCount; } + /** + * Logs + * @param theSubscription + */ + private void logSubscriptionNotActivatedPlusErrorIfPossible(IBaseResource theSubscription) { + String error; + if (theSubscription instanceof Subscription) { + error = ((Subscription) theSubscription).getError(); + } + else if (theSubscription instanceof org.hl7.fhir.dstu3.model.Subscription) { + error = ((org.hl7.fhir.dstu3.model.Subscription) theSubscription).getError(); + } + else if (theSubscription instanceof org.hl7.fhir.dstu2.model.Subscription) { + error = ((org.hl7.fhir.dstu2.model.Subscription) theSubscription).getError(); + } + else { + error = ""; + } + ourLog.error("Subscription " + + theSubscription.getIdElement().getIdPart() + + " could not be activated." + + " This will not prevent startup, but it could lead to undesirable outcomes! " + + error + ); + } + @Override public void handleInit(Collection theResourceIds) { if (!subscriptionsDaoExists()) { diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriberTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriberTest.java new file mode 100644 index 00000000000..0a34918daf0 --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/matcher/subscriber/SubscriptionActivatingSubscriberTest.java @@ -0,0 +1,123 @@ +package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.config.DaoConfig; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; +import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator; +import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; +import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants; +import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry; +import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; +import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Appender; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.instance.model.api.IIdType; +import org.hl7.fhir.r4.model.Subscription; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.internal.util.collections.Sets; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.LoggerFactory; + +import java.util.List; + +@ExtendWith(MockitoExtension.class) +public class SubscriptionActivatingSubscriberTest { + + private Logger ourLogger; + + @Mock + private Appender myAppender; + + @Spy + private FhirContext fhirContext = FhirContext.forR4Cached(); + + @Mock + private SubscriptionRegistry mySubscriptionRegistry; + + @Mock + private DaoRegistry myDaoRegistry; + + @Mock + private SubscriptionCanonicalizer mySubscriptionCanonicallizer; + + @Mock + private DaoConfig myDaoConfig; + + @Mock + private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator; + + @InjectMocks + private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber; + + private Level myStoredLogLevel; + + @BeforeEach + public void init() { + ourLogger = (Logger) LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class); + + myStoredLogLevel = ourLogger.getLevel(); + ourLogger.addAppender(myAppender); + } + + @AfterEach + public void end() { + ourLogger.detachAppender(myAppender); + ourLogger.setLevel(myStoredLogLevel); + } + + @Test + public void activateSubscriptionIfRequired_activationFails_setsStatusOfSubscriptionToError() { + CanonicalSubscriptionChannelType type = CanonicalSubscriptionChannelType.RESTHOOK; + Subscription subscription = new Subscription(); + subscription.setId("Subscription/123"); + String exceptionMsg = "Gone Exception"; + int totalInfoLogs = 1; + + ourLogger.setLevel(Level.ERROR); + IFhirResourceDao dao = Mockito.mock(IFhirResourceDao.class); + + // when + Mockito.when(mySubscriptionCanonicallizer.getChannelType(Mockito.any(IBaseResource.class))) + .thenReturn(type); + Mockito.when(myDaoConfig.getSupportedSubscriptionTypes()) + .thenReturn(Sets.newSet(type.toCanonical())); + Mockito.when(mySubscriptionCanonicallizer.getSubscriptionStatus(Mockito.any(IBaseResource.class))) + .thenReturn(SubscriptionConstants.REQUESTED_STATUS); + Mockito.when(myDaoRegistry.getSubscriptionDao()) + .thenReturn(dao); + Mockito.when(dao.read(Mockito.any(IIdType.class), Mockito.any(SystemRequestDetails.class))) + .thenThrow(new ResourceGoneException(exceptionMsg)); + + // test + boolean isActivated = mySubscriptionActivatingSubscriber.activateSubscriptionIfRequired(subscription); + + // verify + Assertions.assertFalse(isActivated); + ArgumentCaptor captor = ArgumentCaptor.forClass(IBaseResource.class); + Mockito.verify(dao).update(captor.capture(), Mockito.any(SystemRequestDetails.class)); + IBaseResource savedResource = captor.getValue(); + Assertions.assertTrue(savedResource instanceof Subscription); + Assertions.assertEquals(Subscription.SubscriptionStatus.ERROR, ((Subscription)savedResource).getStatus()); + + ArgumentCaptor appenderCaptor = ArgumentCaptor.forClass(ILoggingEvent.class); + Mockito.verify(myAppender, Mockito.times(totalInfoLogs)) + .doAppend(appenderCaptor.capture()); + List events = appenderCaptor.getAllValues(); + Assertions.assertEquals(totalInfoLogs, events.size()); + Assertions.assertTrue(events.get(0).getMessage().contains(exceptionMsg)); + } +} diff --git a/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java new file mode 100644 index 00000000000..63a8cdbd01b --- /dev/null +++ b/hapi-fhir-jpaserver-subscription/src/test/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionLoaderTest.java @@ -0,0 +1,149 @@ +package ca.uhn.fhir.jpa.subscription.match.registry; + +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.jpa.api.dao.DaoRegistry; +import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; +import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache; +import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry; +import ca.uhn.fhir.jpa.model.sched.ISchedulerService; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; +import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; +import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber; + +import ca.uhn.fhir.rest.api.server.IBundleProvider; +import ca.uhn.fhir.rest.server.SimpleBundleProvider; +import ca.uhn.fhir.rest.server.util.ISearchParamRegistry; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Appender; +import org.hl7.fhir.instance.model.api.IBaseResource; +import org.hl7.fhir.r4.model.Subscription; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; + +@ExtendWith(MockitoExtension.class) +public class SubscriptionLoaderTest { + + private Logger ourLogger; + + @Spy + private FhirContext myFhirContext = FhirContext.forR4Cached(); + + @Mock + private Appender myAppender; + + @Mock + private SubscriptionRegistry mySubscriptionRegistry; + + @Mock + private DaoRegistry myDaoRegistery; + + @Mock + private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor; + + @Mock + private ISearchParamRegistry mySearchParamRegistry; + + @Mock + private IResourceChangeListenerRegistry myResourceChangeListenerRegistry; + + @Mock + private ISchedulerService mySchedulerSvc; + + // used in init. But can be used elsewhere + @Mock + private IResourceChangeListenerCache mySubscriptionCache; + + @InjectMocks + private SubscriptionLoader mySubscriptionLoader; + + private Level myStoredLogLevel; + + @BeforeEach + public void init() { + ourLogger = (Logger) LoggerFactory.getLogger(SubscriptionLoader.class); + + myStoredLogLevel = ourLogger.getLevel(); + ourLogger.addAppender(myAppender); + + Mockito.when(myResourceChangeListenerRegistry.registerResourceResourceChangeListener( + Mockito.anyString(), + Mockito.any(SearchParameterMap.class), + Mockito.any(SubscriptionLoader.class), + Mockito.anyLong() + )).thenReturn(mySubscriptionCache); + + mySubscriptionLoader.registerListener(); + } + + @AfterEach + public void end() { + ourLogger.detachAppender(myAppender); + ourLogger.setLevel(myStoredLogLevel); + } + + private IBundleProvider getSubscriptionList(List theReturnedResource) { + IBundleProvider subscriptionList = new SimpleBundleProvider(theReturnedResource); + + subscriptionList.getAllResources().addAll(theReturnedResource); + return subscriptionList; + } + + @Test + public void syncSubscriptions_withInactiveSubscriptionFailing_Syncs() { + // setup + Subscription subscription = new Subscription(); + subscription.setId("Subscription/123"); + subscription.setError("THIS IS AN ERROR"); + IFhirResourceDao subscriptionDao = Mockito.mock(IFhirResourceDao.class); + + ourLogger.setLevel(Level.ERROR); + + // when + Mockito.when(myDaoRegistery.getSubscriptionDao()) + .thenReturn(subscriptionDao); + Mockito.when(myDaoRegistery.isResourceTypeSupported(Mockito.anyString())) + .thenReturn(true); + Mockito.when(subscriptionDao.search(Mockito.any(SearchParameterMap.class), Mockito.any(SystemRequestDetails.class))) + .thenReturn(getSubscriptionList( + Collections.singletonList(subscription) + )); + Mockito.when(mySchedulerSvc.isStopping()) + .thenReturn(false); + + Mockito.when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(Mockito.any(IBaseResource.class))) + .thenReturn(false); + + // test + mySubscriptionLoader.syncSubscriptions(); + + // verify + Mockito.verify(subscriptionDao) + .search(Mockito.any(SearchParameterMap.class), Mockito.any(SystemRequestDetails.class)); + + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(ILoggingEvent.class); + Mockito.verify(myAppender).doAppend(eventCaptor.capture()); + String actual = "Subscription " + + subscription.getIdElement().getIdPart() + + " could not be activated."; + String msg = eventCaptor.getValue().getMessage(); + Assertions.assertTrue(msg + .contains(actual), + String.format("Expected %s, actual %s", msg, actual)); + Assertions.assertTrue(msg.contains(subscription.getError())); + } +} diff --git a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionConstants.java b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionConstants.java index 9e988944d1d..e3e73376249 100644 --- a/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionConstants.java +++ b/hapi-fhir-storage/src/main/java/ca/uhn/fhir/jpa/subscription/match/registry/SubscriptionConstants.java @@ -46,4 +46,5 @@ public class SubscriptionConstants { public static final String SUBSCRIPTION_TYPE = "Subscription.channel.type"; public static final String REQUESTED_STATUS = Subscription.SubscriptionStatus.REQUESTED.toCode(); public static final String ACTIVE_STATUS = Subscription.SubscriptionStatus.ACTIVE.toCode(); + public static final String ERROR_STATUS = Subscription.SubscriptionStatus.ERROR.toCode(); }