2521 delete subscription resource (#3235)
* 2521 will set subscriptionst hat cant be initialized to error state to prevent infinite retries * 2521 add changelog * cleanup * fix issue in merge * fix bad merge * bad merge * 2521 review fixes and test fixes * 2521 review fixes Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local>
This commit is contained in:
parent
9e20d62380
commit
f3c17865d7
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
type: fix
|
||||
title: "When a subscription fails to activate (either on the first attempt or on start up),
|
||||
hapi-fhir will log the exception, set the subscription to ERROR state, and continue on.
|
||||
This is to prevent hanging on startup for errors that cannot be resolved by infinite retries."
|
|
@ -31,9 +31,11 @@ import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
|||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedJsonMessage;
|
||||
import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
||||
import ca.uhn.fhir.util.SubscriptionUtil;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
@ -100,7 +102,8 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
|
|||
CanonicalSubscriptionChannelType subscriptionChannelType = mySubscriptionCanonicalizer.getChannelType(theSubscription);
|
||||
|
||||
// Only activate supported subscriptions
|
||||
if (subscriptionChannelType == null || !myDaoConfig.getSupportedSubscriptionTypes().contains(subscriptionChannelType.toCanonical())) {
|
||||
if (subscriptionChannelType == null
|
||||
|| !myDaoConfig.getSupportedSubscriptionTypes().contains(subscriptionChannelType.toCanonical())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -118,17 +121,24 @@ public class SubscriptionActivatingSubscriber extends BaseSubscriberForSubscript
|
|||
IFhirResourceDao subscriptionDao = myDaoRegistry.getSubscriptionDao();
|
||||
SystemRequestDetails srd = SystemRequestDetails.forAllPartition();
|
||||
|
||||
IBaseResource subscription = subscriptionDao.read(theSubscription.getIdElement(), SystemRequestDetails.forAllPartition());
|
||||
subscription.setId(subscription.getIdElement().toVersionless());
|
||||
|
||||
ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS);
|
||||
IBaseResource subscription = null;
|
||||
try {
|
||||
// read can throw ResourceGoneException
|
||||
// if this happens, we will treat this as a failure to activate
|
||||
subscription = subscriptionDao.read(theSubscription.getIdElement(), SystemRequestDetails.forAllPartition());
|
||||
subscription.setId(subscription.getIdElement().toVersionless());
|
||||
|
||||
ourLog.info("Activating subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), SubscriptionConstants.REQUESTED_STATUS, SubscriptionConstants.ACTIVE_STATUS);
|
||||
SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ACTIVE_STATUS);
|
||||
subscriptionDao.update(subscription, srd);
|
||||
return true;
|
||||
} catch (final UnprocessableEntityException e) {
|
||||
} catch (final UnprocessableEntityException | ResourceGoneException e) {
|
||||
subscription = subscription != null ? subscription : theSubscription;
|
||||
ourLog.error("Failed to activate subscription "
|
||||
+ subscription.getIdElement()
|
||||
+ " : " + e.getMessage());
|
||||
ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
|
||||
SubscriptionUtil.setStatus(myFhirContext, subscription, "error");
|
||||
SubscriptionUtil.setStatus(myFhirContext, subscription, SubscriptionConstants.ERROR_STATUS);
|
||||
SubscriptionUtil.setReason(myFhirContext, subscription, e.getMessage());
|
||||
subscriptionDao.update(subscription, srd);
|
||||
return false;
|
||||
|
|
|
@ -58,7 +58,7 @@ import java.util.stream.Collectors;
|
|||
public class SubscriptionLoader implements IResourceChangeListener {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class);
|
||||
private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
|
||||
private static long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
|
||||
private static final long REFRESH_INTERVAL = DateUtils.MILLIS_PER_MINUTE;
|
||||
|
||||
private final Object mySyncSubscriptionsLock = new Object();
|
||||
@Autowired
|
||||
|
@ -134,6 +134,8 @@ public class SubscriptionLoader implements IResourceChangeListener {
|
|||
}
|
||||
|
||||
synchronized int doSyncSubscriptionsWithRetry() {
|
||||
// retry runs MAX_RETRIES times
|
||||
// and if errors result every time, it will fail
|
||||
Retrier<Integer> syncSubscriptionRetrier = new Retrier<>(this::doSyncSubscriptions, MAX_RETRIES);
|
||||
return syncSubscriptionRetrier.runWithRetry();
|
||||
}
|
||||
|
@ -186,10 +188,15 @@ public class SubscriptionLoader implements IResourceChangeListener {
|
|||
String nextId = resource.getIdElement().getIdPart();
|
||||
allIds.add(nextId);
|
||||
|
||||
// internally, subscriptions that cannot activate
|
||||
// will be set to error
|
||||
boolean activated = mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(resource);
|
||||
if (activated) {
|
||||
activatedCount++;
|
||||
}
|
||||
else {
|
||||
logSubscriptionNotActivatedPlusErrorIfPossible(resource);
|
||||
}
|
||||
|
||||
boolean registered = mySubscriptionRegistry.registerSubscriptionUnlessAlreadyRegistered(resource);
|
||||
if (registered) {
|
||||
|
@ -202,6 +209,32 @@ public class SubscriptionLoader implements IResourceChangeListener {
|
|||
return activatedCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Logs
|
||||
* @param theSubscription
|
||||
*/
|
||||
private void logSubscriptionNotActivatedPlusErrorIfPossible(IBaseResource theSubscription) {
|
||||
String error;
|
||||
if (theSubscription instanceof Subscription) {
|
||||
error = ((Subscription) theSubscription).getError();
|
||||
}
|
||||
else if (theSubscription instanceof org.hl7.fhir.dstu3.model.Subscription) {
|
||||
error = ((org.hl7.fhir.dstu3.model.Subscription) theSubscription).getError();
|
||||
}
|
||||
else if (theSubscription instanceof org.hl7.fhir.dstu2.model.Subscription) {
|
||||
error = ((org.hl7.fhir.dstu2.model.Subscription) theSubscription).getError();
|
||||
}
|
||||
else {
|
||||
error = "";
|
||||
}
|
||||
ourLog.error("Subscription "
|
||||
+ theSubscription.getIdElement().getIdPart()
|
||||
+ " could not be activated."
|
||||
+ " This will not prevent startup, but it could lead to undesirable outcomes! "
|
||||
+ error
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleInit(Collection<IIdType> theResourceIds) {
|
||||
if (!subscriptionsDaoExists()) {
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
package ca.uhn.fhir.jpa.subscription.match.matcher.subscriber;
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.jpa.api.config.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator;
|
||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer;
|
||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionConstants;
|
||||
import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionRegistry;
|
||||
import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType;
|
||||
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||
import ch.qos.logback.classic.Level;
|
||||
import ch.qos.logback.classic.Logger;
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
import ch.qos.logback.core.Appender;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.Spy;
|
||||
import org.mockito.internal.util.collections.Sets;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class SubscriptionActivatingSubscriberTest {
|
||||
|
||||
private Logger ourLogger;
|
||||
|
||||
@Mock
|
||||
private Appender<ILoggingEvent> myAppender;
|
||||
|
||||
@Spy
|
||||
private FhirContext fhirContext = FhirContext.forR4Cached();
|
||||
|
||||
@Mock
|
||||
private SubscriptionRegistry mySubscriptionRegistry;
|
||||
|
||||
@Mock
|
||||
private DaoRegistry myDaoRegistry;
|
||||
|
||||
@Mock
|
||||
private SubscriptionCanonicalizer mySubscriptionCanonicallizer;
|
||||
|
||||
@Mock
|
||||
private DaoConfig myDaoConfig;
|
||||
|
||||
@Mock
|
||||
private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator;
|
||||
|
||||
@InjectMocks
|
||||
private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
|
||||
|
||||
private Level myStoredLogLevel;
|
||||
|
||||
@BeforeEach
|
||||
public void init() {
|
||||
ourLogger = (Logger) LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
|
||||
|
||||
myStoredLogLevel = ourLogger.getLevel();
|
||||
ourLogger.addAppender(myAppender);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void end() {
|
||||
ourLogger.detachAppender(myAppender);
|
||||
ourLogger.setLevel(myStoredLogLevel);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void activateSubscriptionIfRequired_activationFails_setsStatusOfSubscriptionToError() {
|
||||
CanonicalSubscriptionChannelType type = CanonicalSubscriptionChannelType.RESTHOOK;
|
||||
Subscription subscription = new Subscription();
|
||||
subscription.setId("Subscription/123");
|
||||
String exceptionMsg = "Gone Exception";
|
||||
int totalInfoLogs = 1;
|
||||
|
||||
ourLogger.setLevel(Level.ERROR);
|
||||
IFhirResourceDao dao = Mockito.mock(IFhirResourceDao.class);
|
||||
|
||||
// when
|
||||
Mockito.when(mySubscriptionCanonicallizer.getChannelType(Mockito.any(IBaseResource.class)))
|
||||
.thenReturn(type);
|
||||
Mockito.when(myDaoConfig.getSupportedSubscriptionTypes())
|
||||
.thenReturn(Sets.newSet(type.toCanonical()));
|
||||
Mockito.when(mySubscriptionCanonicallizer.getSubscriptionStatus(Mockito.any(IBaseResource.class)))
|
||||
.thenReturn(SubscriptionConstants.REQUESTED_STATUS);
|
||||
Mockito.when(myDaoRegistry.getSubscriptionDao())
|
||||
.thenReturn(dao);
|
||||
Mockito.when(dao.read(Mockito.any(IIdType.class), Mockito.any(SystemRequestDetails.class)))
|
||||
.thenThrow(new ResourceGoneException(exceptionMsg));
|
||||
|
||||
// test
|
||||
boolean isActivated = mySubscriptionActivatingSubscriber.activateSubscriptionIfRequired(subscription);
|
||||
|
||||
// verify
|
||||
Assertions.assertFalse(isActivated);
|
||||
ArgumentCaptor<IBaseResource> captor = ArgumentCaptor.forClass(IBaseResource.class);
|
||||
Mockito.verify(dao).update(captor.capture(), Mockito.any(SystemRequestDetails.class));
|
||||
IBaseResource savedResource = captor.getValue();
|
||||
Assertions.assertTrue(savedResource instanceof Subscription);
|
||||
Assertions.assertEquals(Subscription.SubscriptionStatus.ERROR, ((Subscription)savedResource).getStatus());
|
||||
|
||||
ArgumentCaptor<ILoggingEvent> appenderCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
|
||||
Mockito.verify(myAppender, Mockito.times(totalInfoLogs))
|
||||
.doAppend(appenderCaptor.capture());
|
||||
List<ILoggingEvent> events = appenderCaptor.getAllValues();
|
||||
Assertions.assertEquals(totalInfoLogs, events.size());
|
||||
Assertions.assertTrue(events.get(0).getMessage().contains(exceptionMsg));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,149 @@
|
|||
package ca.uhn.fhir.jpa.subscription.match.registry;
|
||||
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
|
||||
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
|
||||
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerCache;
|
||||
import ca.uhn.fhir.jpa.cache.IResourceChangeListenerRegistry;
|
||||
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.subscription.match.matcher.subscriber.SubscriptionActivatingSubscriber;
|
||||
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import ca.uhn.fhir.rest.server.SimpleBundleProvider;
|
||||
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
|
||||
import ch.qos.logback.classic.Level;
|
||||
import ch.qos.logback.classic.Logger;
|
||||
import ch.qos.logback.classic.spi.ILoggingEvent;
|
||||
import ch.qos.logback.core.Appender;
|
||||
import org.hl7.fhir.instance.model.api.IBaseResource;
|
||||
import org.hl7.fhir.r4.model.Subscription;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.Spy;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class SubscriptionLoaderTest {
|
||||
|
||||
private Logger ourLogger;
|
||||
|
||||
@Spy
|
||||
private FhirContext myFhirContext = FhirContext.forR4Cached();
|
||||
|
||||
@Mock
|
||||
private Appender<ILoggingEvent> myAppender;
|
||||
|
||||
@Mock
|
||||
private SubscriptionRegistry mySubscriptionRegistry;
|
||||
|
||||
@Mock
|
||||
private DaoRegistry myDaoRegistery;
|
||||
|
||||
@Mock
|
||||
private SubscriptionActivatingSubscriber mySubscriptionActivatingInterceptor;
|
||||
|
||||
@Mock
|
||||
private ISearchParamRegistry mySearchParamRegistry;
|
||||
|
||||
@Mock
|
||||
private IResourceChangeListenerRegistry myResourceChangeListenerRegistry;
|
||||
|
||||
@Mock
|
||||
private ISchedulerService mySchedulerSvc;
|
||||
|
||||
// used in init. But can be used elsewhere
|
||||
@Mock
|
||||
private IResourceChangeListenerCache mySubscriptionCache;
|
||||
|
||||
@InjectMocks
|
||||
private SubscriptionLoader mySubscriptionLoader;
|
||||
|
||||
private Level myStoredLogLevel;
|
||||
|
||||
@BeforeEach
|
||||
public void init() {
|
||||
ourLogger = (Logger) LoggerFactory.getLogger(SubscriptionLoader.class);
|
||||
|
||||
myStoredLogLevel = ourLogger.getLevel();
|
||||
ourLogger.addAppender(myAppender);
|
||||
|
||||
Mockito.when(myResourceChangeListenerRegistry.registerResourceResourceChangeListener(
|
||||
Mockito.anyString(),
|
||||
Mockito.any(SearchParameterMap.class),
|
||||
Mockito.any(SubscriptionLoader.class),
|
||||
Mockito.anyLong()
|
||||
)).thenReturn(mySubscriptionCache);
|
||||
|
||||
mySubscriptionLoader.registerListener();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void end() {
|
||||
ourLogger.detachAppender(myAppender);
|
||||
ourLogger.setLevel(myStoredLogLevel);
|
||||
}
|
||||
|
||||
private IBundleProvider getSubscriptionList(List<IBaseResource> theReturnedResource) {
|
||||
IBundleProvider subscriptionList = new SimpleBundleProvider(theReturnedResource);
|
||||
|
||||
subscriptionList.getAllResources().addAll(theReturnedResource);
|
||||
return subscriptionList;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void syncSubscriptions_withInactiveSubscriptionFailing_Syncs() {
|
||||
// setup
|
||||
Subscription subscription = new Subscription();
|
||||
subscription.setId("Subscription/123");
|
||||
subscription.setError("THIS IS AN ERROR");
|
||||
IFhirResourceDao<Subscription> subscriptionDao = Mockito.mock(IFhirResourceDao.class);
|
||||
|
||||
ourLogger.setLevel(Level.ERROR);
|
||||
|
||||
// when
|
||||
Mockito.when(myDaoRegistery.getSubscriptionDao())
|
||||
.thenReturn(subscriptionDao);
|
||||
Mockito.when(myDaoRegistery.isResourceTypeSupported(Mockito.anyString()))
|
||||
.thenReturn(true);
|
||||
Mockito.when(subscriptionDao.search(Mockito.any(SearchParameterMap.class), Mockito.any(SystemRequestDetails.class)))
|
||||
.thenReturn(getSubscriptionList(
|
||||
Collections.singletonList(subscription)
|
||||
));
|
||||
Mockito.when(mySchedulerSvc.isStopping())
|
||||
.thenReturn(false);
|
||||
|
||||
Mockito.when(mySubscriptionActivatingInterceptor.activateSubscriptionIfRequired(Mockito.any(IBaseResource.class)))
|
||||
.thenReturn(false);
|
||||
|
||||
// test
|
||||
mySubscriptionLoader.syncSubscriptions();
|
||||
|
||||
// verify
|
||||
Mockito.verify(subscriptionDao)
|
||||
.search(Mockito.any(SearchParameterMap.class), Mockito.any(SystemRequestDetails.class));
|
||||
|
||||
ArgumentCaptor<ILoggingEvent> eventCaptor = ArgumentCaptor.forClass(ILoggingEvent.class);
|
||||
Mockito.verify(myAppender).doAppend(eventCaptor.capture());
|
||||
String actual = "Subscription "
|
||||
+ subscription.getIdElement().getIdPart()
|
||||
+ " could not be activated.";
|
||||
String msg = eventCaptor.getValue().getMessage();
|
||||
Assertions.assertTrue(msg
|
||||
.contains(actual),
|
||||
String.format("Expected %s, actual %s", msg, actual));
|
||||
Assertions.assertTrue(msg.contains(subscription.getError()));
|
||||
}
|
||||
}
|
|
@ -46,4 +46,5 @@ public class SubscriptionConstants {
|
|||
public static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
|
||||
public static final String REQUESTED_STATUS = Subscription.SubscriptionStatus.REQUESTED.toCode();
|
||||
public static final String ACTIVE_STATUS = Subscription.SubscriptionStatus.ACTIVE.toCode();
|
||||
public static final String ERROR_STATUS = Subscription.SubscriptionStatus.ERROR.toCode();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue