Fix an issue in subscription interceptor which prevented the server from

starting if an invalid subscription was already in the database
This commit is contained in:
jamesagnew 2018-02-03 19:20:58 -05:00
parent 17b1ff727e
commit 86a0774305
14 changed files with 391 additions and 300 deletions

View File

@ -48,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
@ -92,8 +93,10 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
@Autowired(required = false)
@Qualifier("myEventDefinitionDaoR4")
private IFhirResourceDao<org.hl7.fhir.r4.model.EventDefinition> myEventDefinitionDaoR4;
@Autowired
@Autowired()
private PlatformTransactionManager myTxManager;
@Autowired
private AsyncTaskExecutor myAsyncTaskExecutor;
/**
* Constructor
@ -364,6 +367,11 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
});
}
@VisibleForTesting
public void setAsyncTaskExecutorForUnitTest(AsyncTaskExecutor theAsyncTaskExecutor) {
myAsyncTaskExecutor = theAsyncTaskExecutor;
}
public void setFhirContext(FhirContext theCtx) {
myCtx = theCtx;
}
@ -455,7 +463,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
}
if (mySubscriptionActivatingSubscriber == null) {
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager);
mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, myTxManager, myAsyncTaskExecutor);
}
registerSubscriptionCheckingSubscriber();

View File

@ -25,41 +25,51 @@ import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.SubscriptionUtil;
import org.apache.commons.lang3.time.DateUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.*;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("unchecked")
public class SubscriptionActivatingSubscriber {
private static boolean ourWaitForSubscriptionActivationSynchronouslyForUnitTest;
private final IFhirResourceDao mySubscriptionDao;
private final BaseSubscriptionInterceptor mySubscriptionInterceptor;
private final PlatformTransactionManager myTransactionManager;
private final AsyncTaskExecutor myTaskExecutor;
private Logger ourLog = LoggerFactory.getLogger(SubscriptionActivatingSubscriber.class);
private FhirContext myCtx;
private Subscription.SubscriptionChannelType myChannelType;
/**
* Constructor
*/
public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor, PlatformTransactionManager theTransactionManager) {
public SubscriptionActivatingSubscriber(IFhirResourceDao<? extends IBaseResource> theSubscriptionDao, Subscription.SubscriptionChannelType theChannelType, BaseSubscriptionInterceptor theSubscriptionInterceptor, PlatformTransactionManager theTransactionManager, AsyncTaskExecutor theTaskExecutor) {
mySubscriptionDao = theSubscriptionDao;
mySubscriptionInterceptor = theSubscriptionInterceptor;
myChannelType = theChannelType;
myCtx = theSubscriptionDao.getContext();
myTransactionManager = theTransactionManager;
myTaskExecutor = theTaskExecutor;
Validate.notNull(theTaskExecutor);
}
public void activateAndRegisterSubscriptionIfRequired(final IBaseResource theSubscription) {
@ -75,14 +85,40 @@ public class SubscriptionActivatingSubscriber {
final String activeStatus = Subscription.SubscriptionStatus.ACTIVE.toCode();
if (requestedStatus.equals(statusString)) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
/*
* If we're in a transaction, we don't want to try and change the status from
* requested to active within the same transaction because it's too late by
* the time we get here to make modifications to the payload.
*
* So, we register a synchronization, meaning that when the transaction is
* finished, we'll schedule a task to do this in a separate worker thread
* to avoid any possibility of conflict.
*/
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
activateSubscription(status, activeStatus, theSubscription, requestedStatus);
Future<?> activationFuture = myTaskExecutor.submit(new Runnable() {
@Override
public void run() {
activateSubscription(activeStatus, theSubscription, requestedStatus);
}
});
/*
* If we're running in a unit test, it's nice to be predictable in
* terms of order... In the real world it's a recipe for deadlocks
*/
if (ourWaitForSubscriptionActivationSynchronouslyForUnitTest) {
try {
activationFuture.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
ourLog.error("Failed to activate subscription", e);
}
}
}
});
} else {
activateSubscription(status, activeStatus, theSubscription, requestedStatus);
activateSubscription(activeStatus, theSubscription, requestedStatus);
}
} else if (activeStatus.equals(statusString)) {
if (!mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
@ -97,21 +133,22 @@ public class SubscriptionActivatingSubscriber {
}
}
private void activateSubscription(IPrimitiveType<?> theStatus, String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) {
theStatus.setValueAsString(theActiveStatus);
ourLog.info("Activating and registering subscription {} from status {} to {}", theSubscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus);
try {
mySubscriptionDao.update(theSubscription);
} catch (final UnprocessableEntityException e) {
ourLog.info("Changing status of {} to ERROR", theSubscription.getIdElement());
private void activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) {
IBaseResource subscription = mySubscriptionDao.read(theSubscription.getIdElement());
ourLog.info("Activating and registering subscription {} from status {} to {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus);
try {
SubscriptionUtil.setStatus(myCtx, subscription, theActiveStatus);
mySubscriptionDao.update(subscription);
mySubscriptionInterceptor.registerSubscription(subscription.getIdElement(), subscription);
} catch (final UnprocessableEntityException e) {
ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
SubscriptionUtil.setStatus(myCtx, subscription, "error");
SubscriptionUtil.setReason(myCtx, subscription, e.getMessage());
mySubscriptionDao.update(subscription);
}
}
public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException {
switch (theOperationType) {
@ -137,4 +174,9 @@ public class SubscriptionActivatingSubscriber {
}
@VisibleForTesting
public static void setWaitForSubscriptionActivationSynchronouslyForUnitTest(boolean theWaitForSubscriptionActivationSynchronouslyForUnitTest) {
ourWaitForSubscriptionActivationSynchronouslyForUnitTest = theWaitForSubscriptionActivationSynchronouslyForUnitTest;
}
}

View File

@ -74,7 +74,7 @@ public class SubscriptionsRequireManualActivationInterceptorDstu2 extends Server
if (newStatus == null) {
String actualCode = subscription.getStatusElement().getValueAsString();
throw new UnprocessableEntityException("Can not " + theOperation.getCode() + " resource: Subscription.status must be populated" + ((isNotBlank(actualCode)) ? " (invalid value " + actualCode + ")" : ""));
throw new UnprocessableEntityException("Can not " + theOperation.getCode() + " resource: Subscription.status must be populated on this server" + ((isNotBlank(actualCode)) ? " (invalid value " + actualCode + ")" : ""));
}
if (theOldResourceOrNull != null) {
@ -108,7 +108,7 @@ public class SubscriptionsRequireManualActivationInterceptorDstu2 extends Server
}
if (theSubscription.getStatus() == null) {
throw new UnprocessableEntityException("Can not " + theOperation.getCode().toLowerCase() + " resource: Subscription.status must be populated");
throw new UnprocessableEntityException("Can not " + theOperation.getCode().toLowerCase() + " resource: Subscription.status must be populated on this server");
}
throw new UnprocessableEntityException("Subscription.status must be '" + SubscriptionStatusEnum.OFF.getCode() + "' or '" + SubscriptionStatusEnum.REQUESTED.getCode() + "' on a newly created subscription");

View File

@ -74,7 +74,7 @@ public class SubscriptionsRequireManualActivationInterceptorDstu3 extends Server
if (newStatus == null) {
String actualCode = subscription.getStatusElement().getValueAsString();
throw new UnprocessableEntityException("Can not " + theOperation.getCode() + " resource: Subscription.status must be populated" + ((isNotBlank(actualCode)) ? " (invalid value " + actualCode + ")" : ""));
throw new UnprocessableEntityException("Can not " + theOperation.getCode() + " resource: Subscription.status must be populated on this server" + ((isNotBlank(actualCode)) ? " (invalid value " + actualCode + ")" : ""));
}
if (theOldResourceOrNull != null) {
@ -108,7 +108,7 @@ public class SubscriptionsRequireManualActivationInterceptorDstu3 extends Server
}
if (theSubscription.getStatus() == null) {
throw new UnprocessableEntityException("Can not " + theOperation.getCode().toLowerCase() + " resource: Subscription.status must be populated");
throw new UnprocessableEntityException("Can not " + theOperation.getCode().toLowerCase() + " resource: Subscription.status must be populated on this server");
}
throw new UnprocessableEntityException("Subscription.status must be '" + SubscriptionStatus.OFF.toCode() + "' or '" + SubscriptionStatus.REQUESTED.toCode() + "' on a newly created subscription");

View File

@ -74,7 +74,7 @@ public class SubscriptionsRequireManualActivationInterceptorR4 extends ServerOpe
if (newStatus == null) {
String actualCode = subscription.getStatusElement().getValueAsString();
throw new UnprocessableEntityException("Can not " + theOperation.getCode() + " resource: Subscription.status must be populated" + ((isNotBlank(actualCode)) ? " (invalid value " + actualCode + ")" : ""));
throw new UnprocessableEntityException("Can not " + theOperation.getCode() + " resource: Subscription.status must be populated on this server" + ((isNotBlank(actualCode)) ? " (invalid value " + actualCode + ")" : ""));
}
if (theOldResourceOrNull != null) {
@ -108,7 +108,7 @@ public class SubscriptionsRequireManualActivationInterceptorR4 extends ServerOpe
}
if (theSubscription.getStatus() == null) {
throw new UnprocessableEntityException("Can not " + theOperation.getCode().toLowerCase() + " resource: Subscription.status must be populated");
throw new UnprocessableEntityException("Can not " + theOperation.getCode().toLowerCase() + " resource: Subscription.status must be populated on this server");
}
throw new UnprocessableEntityException("Subscription.status must be '" + SubscriptionStatus.OFF.toCode() + "' or '" + SubscriptionStatus.REQUESTED.toCode() + "' on a newly created subscription");

View File

@ -47,7 +47,7 @@ public class TestDstu3Config extends BaseJavaConfigDstu3 {
@Override
public Connection getConnection() throws SQLException {
public Connection getConnection() {
ConnectionWrapper retVal;
try {
retVal = new ConnectionWrapper(super.getConnection());

View File

@ -1,21 +1,35 @@
package ca.uhn.fhir.jpa.dao.dstu3;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import javax.persistence.EntityManager;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.config.TestDstu3Config;
import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.dao.data.*;
import ca.uhn.fhir.jpa.dao.dstu2.FhirResourceDaoDstu2SearchNoFtTest;
import ca.uhn.fhir.jpa.entity.ResourceIndexedSearchParamString;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import ca.uhn.fhir.jpa.provider.dstu3.JpaSystemProviderDstu3;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.term.IHapiTerminologySvc;
import ca.uhn.fhir.jpa.validation.JpaValidationSupportChainDstu3;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.parser.StrictErrorHandler;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import ca.uhn.fhir.util.TestUtil;
import ca.uhn.fhir.util.UrlUtil;
import org.apache.commons.io.IOUtils;
import org.hibernate.search.jpa.FullTextEntityManager;
import org.hibernate.search.jpa.Search;
import org.hl7.fhir.dstu3.hapi.ctx.IValidationSupport;
import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.junit.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -27,26 +41,13 @@ import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.config.TestDstu3Config;
import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.dao.data.*;
import ca.uhn.fhir.jpa.dao.dstu2.FhirResourceDaoDstu2SearchNoFtTest;
import ca.uhn.fhir.jpa.entity.ResourceIndexedSearchParamString;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import ca.uhn.fhir.jpa.provider.dstu3.JpaSystemProviderDstu3;
import ca.uhn.fhir.jpa.search.*;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.term.IHapiTerminologySvc;
import ca.uhn.fhir.jpa.validation.JpaValidationSupportChainDstu3;
import ca.uhn.fhir.parser.IParser;
import ca.uhn.fhir.parser.StrictErrorHandler;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import ca.uhn.fhir.rest.server.method.MethodUtil;
import ca.uhn.fhir.util.TestUtil;
import ca.uhn.fhir.util.UrlUtil;
import javax.persistence.EntityManager;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {TestDstu3Config.class})
@ -254,6 +255,7 @@ public abstract class BaseJpaDstu3Test extends BaseJpaTest {
@Before
@Transactional()
public void beforePurgeDatabase() {
final EntityManager entityManager = this.myEntityManager;
purgeDatabase(entityManager, myTxManager, mySearchParamPresenceSvc, mySearchCoordinatorSvc, mySearchParamRegsitry);
}

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.dao.dstu3;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.subscription.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.resthook.SubscriptionRestHookInterceptor;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.TestUtil;
@ -9,6 +10,7 @@ import org.hl7.fhir.dstu3.model.Subscription;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.TransactionStatus;
@ -26,10 +28,16 @@ public class FhirResourceDaoDstu3InvalidSubscriptionTest extends BaseJpaDstu3Tes
@After
public void afterResetDao() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false);
myDaoConfig.setResourceServerIdStrategy(new DaoConfig().getResourceServerIdStrategy());
BaseHapiFhirDao.setValidationDisabledForUnitTest(false);
}
@Before
public void before() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(true);
}
@Test
public void testCreateInvalidSubscriptionOkButCanNotActivate() {
Subscription s = new Subscription();
@ -45,7 +53,7 @@ public class FhirResourceDaoDstu3InvalidSubscriptionTest extends BaseJpaDstu3Tes
mySubscriptionDao.update(s);
fail();
} catch (UnprocessableEntityException e) {
assertEquals("Subscription.criteria must be in the form \"{Resource Type}?[params]", e.getMessage());
assertEquals("Subscription.criteria must be in the form \"{Resource Type}?[params]\"", e.getMessage());
}
}

View File

@ -2,6 +2,7 @@ package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.subscription.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.jpa.subscription.resthook.SubscriptionRestHookInterceptor;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.TestUtil;
@ -9,6 +10,7 @@ import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Subscription;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.TransactionStatus;
@ -26,10 +28,16 @@ public class FhirResourceDaoR4InvalidSubscriptionTest extends BaseJpaR4Test {
@After
public void afterResetDao() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false);
myDaoConfig.setResourceServerIdStrategy(new DaoConfig().getResourceServerIdStrategy());
BaseHapiFhirDao.setValidationDisabledForUnitTest(false);
}
@Before
public void before() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(true);
}
@Test
public void testCreateInvalidSubscriptionOkButCanNotActivate() {
Subscription s = new Subscription();
@ -45,7 +53,7 @@ public class FhirResourceDaoR4InvalidSubscriptionTest extends BaseJpaR4Test {
mySubscriptionDao.update(s);
fail();
} catch (UnprocessableEntityException e) {
assertEquals("Subscription.criteria must be in the form \"{Resource Type}?[params]", e.getMessage());
assertEquals("Subscription.criteria must be in the form \"{Resource Type}?[params]\"", e.getMessage());
}
}

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.dao.data.ISearchDao;
import ca.uhn.fhir.jpa.entity.Search;
@ -45,11 +46,13 @@ public class FhirResourceDaoR4SearchPageExpiryTest extends BaseJpaR4Test {
public void before() {
StaleSearchDeletingSvcImpl staleSearchDeletingSvc = AopTestUtils.getTargetObject(myStaleSearchDeletingSvc);
staleSearchDeletingSvc.setCutoffSlackForUnitTest(0);
myDaoConfig.setCountSearchResultsUpTo(new DaoConfig().getCountSearchResultsUpTo());
}
@Before
public void beforeDisableResultReuse() {
myDaoConfig.setReuseCachedSearchResultsForMillis(null);
myDaoConfig.setCountSearchResultsUpTo(10000);
}
@Test

View File

@ -22,6 +22,7 @@ import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.AsyncTaskExecutor;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
@ -42,6 +43,8 @@ public class EmailSubscriptionDstu2Test extends BaseResourceProviderDstu2Test {
@Autowired
private List<IFhirResourceDao<?>> myResourceDaos;
@Autowired
private AsyncTaskExecutor myAsyncTaskExecutor;
@After
public void after() throws Exception {
@ -70,6 +73,7 @@ public class EmailSubscriptionDstu2Test extends BaseResourceProviderDstu2Test {
mySubscriber.setResourceDaos(myResourceDaos);
mySubscriber.setFhirContext(myFhirCtx);
mySubscriber.setTxManager(ourTxManager);
mySubscriber.setAsyncTaskExecutorForUnitTest(myAsyncTaskExecutor);
mySubscriber.start();
ourRestServer.registerInterceptor(mySubscriber);

View File

@ -1,35 +1,21 @@
package ca.uhn.fhir.jpa.subscription.email;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.provider.dstu3.BaseResourceProviderDstu3Test;
import ca.uhn.fhir.jpa.subscription.RestHookTestDstu2Test;
import ca.uhn.fhir.jpa.testutil.RandomServerPortProvider;
import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.rest.annotation.Create;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.server.IResourceProvider;
import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.util.PortUtil;
import com.google.common.collect.Lists;
import com.icegreen.greenmail.store.FolderException;
import com.icegreen.greenmail.util.GreenMail;
import com.icegreen.greenmail.util.ServerSetup;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.hl7.fhir.dstu3.model.*;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.*;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -71,7 +57,8 @@ public class EmailSubscriptionDstu3Test extends BaseResourceProviderDstu3Test {
@Before
public void beforeRegisterEmailListener() throws FolderException {
ourTestSmtp.purgeEmailFromAllMailboxes();;
ourTestSmtp.purgeEmailFromAllMailboxes();
;
ourRestServer.registerInterceptor(ourEmailSubscriptionInterceptor);
JavaMailEmailSender emailSender = new JavaMailEmailSender();
@ -83,24 +70,6 @@ public class EmailSubscriptionDstu3Test extends BaseResourceProviderDstu3Test {
ourEmailSubscriptionInterceptor.setDefaultFromAddress("123@hapifhir.io");
}
@AfterClass
public static void afterClass() {
ourTestSmtp.stop();
}
@BeforeClass
public static void beforeClass() {
ourListenerPort = RandomServerPortProvider.findFreePort();
ServerSetup smtp = new ServerSetup(ourListenerPort, null, ServerSetup.PROTOCOL_SMTP);
smtp.setServerStartupTimeout(2000);
smtp.setReadTimeout(2000);
smtp.setConnectionTimeout(2000);
ourTestSmtp = new GreenMail(smtp);
ourTestSmtp.start();
}
private Subscription createSubscription(String theCriteria, String thePayload) throws InterruptedException {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");
@ -169,7 +138,6 @@ public class EmailSubscriptionDstu3Test extends BaseResourceProviderDstu3Test {
assertEquals(mySubscriptionIds.get(0).toUnqualifiedVersionless().getValue(), received.get(0).getHeader("X-FHIR-Subscription")[0]);
}
@Test
public void testEmailSubscriptionWithCustom() throws Exception {
String payload = "This is the body";
@ -180,6 +148,7 @@ public class EmailSubscriptionDstu3Test extends BaseResourceProviderDstu3Test {
Subscription subscriptionTemp = ourClient.read(Subscription.class, sub1.getId());
Assert.assertNotNull(subscriptionTemp);
subscriptionTemp.getChannel().addExtension()
.setUrl(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM)
.setValue(new StringType("mailto:myfrom@from.com"));
@ -235,14 +204,11 @@ public class EmailSubscriptionDstu3Test extends BaseResourceProviderDstu3Test {
.setValue(new StringType("This is a subject"));
subscriptionTemp.setIdElement(subscriptionTemp.getIdElement().toUnqualifiedVersionless());
IIdType id = ourClient.update().resource(subscriptionTemp).withId(subscriptionTemp.getIdElement()).execute().getId();
ourLog.info("Subscription ID is: {}", id.getValue());
ourLog.info(myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(subscriptionTemp));
ourClient.update().resource(subscriptionTemp).withId(subscriptionTemp.getIdElement()).execute();
waitForQueueToDrain();
sendObservation(code, "SNOMED-CT");
waitForQueueToDrain();
@ -263,10 +229,32 @@ public class EmailSubscriptionDstu3Test extends BaseResourceProviderDstu3Test {
assertEquals("This is a subject", received.get(0).getSubject().toString().trim());
assertEquals("This is the body", received.get(0).getContent().toString().trim());
assertEquals(mySubscriptionIds.get(0).toUnqualifiedVersionless().getValue(), received.get(0).getHeader("X-FHIR-Subscription")[0]);
ourLog.info("Subscription: {}", myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(ourClient.history().onInstance(id).andReturnBundle(Bundle.class).execute()));
Subscription subscription = ourClient.read().resource(Subscription.class).withId(id.toUnqualifiedVersionless()).execute();
assertEquals(Subscription.SubscriptionStatus.ACTIVE, subscription.getStatus());
assertEquals("3", subscription.getIdElement().getVersionIdPart());
}
private void waitForQueueToDrain() throws InterruptedException {
RestHookTestDstu2Test.waitForQueueToDrain(ourEmailSubscriptionInterceptor);
}
@AfterClass
public static void afterClass() {
ourTestSmtp.stop();
}
@BeforeClass
public static void beforeClass() {
ourListenerPort = RandomServerPortProvider.findFreePort();
ServerSetup smtp = new ServerSetup(ourListenerPort, null, ServerSetup.PROTOCOL_SMTP);
smtp.setServerStartupTimeout(2000);
smtp.setReadTimeout(2000);
smtp.setConnectionTimeout(2000);
ourTestSmtp = new GreenMail(smtp);
ourTestSmtp.start();
}
}

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.subscription.r4;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.provider.r4.BaseResourceProviderR4Test;
import ca.uhn.fhir.jpa.subscription.RestHookTestDstu2Test;
import ca.uhn.fhir.jpa.subscription.SubscriptionActivatingSubscriber;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import ca.uhn.fhir.rest.annotation.Update;
import ca.uhn.fhir.rest.api.Constants;
@ -38,11 +39,22 @@ public class RestHookActivatesPreExistingSubscriptionsR4Test extends BaseResourc
private static List<String> ourContentTypes = new ArrayList<>();
private static List<String> ourHeaders = new ArrayList<>();
@After
public void afterResetSubscriptionActivatingInterceptor() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(false);
}
@After
public void afterUnregisterRestHookListener() {
ourRestServer.unregisterInterceptor(getRestHookSubscriptionInterceptor());
}
@Before
public void beforeSetSubscriptionActivatingInterceptor() {
SubscriptionActivatingSubscriber.setWaitForSubscriptionActivationSynchronouslyForUnitTest(true);
}
private Subscription createSubscription(String theCriteria, String thePayload, String theEndpoint) throws InterruptedException {
Subscription subscription = new Subscription();
subscription.setReason("Monitor new neonatal function (note, age will be determined by the monitor)");

View File

@ -21,11 +21,6 @@ package ca.uhn.fhir.spring.boot.autoconfigure;
*/
import java.util.List;
import javax.servlet.ServletException;
import javax.sql.DataSource;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jaxrs.server.AbstractJaxRsProvider;
import ca.uhn.fhir.jpa.config.BaseJavaConfigDstu2;
@ -47,16 +42,10 @@ import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.rest.server.interceptor.ResponseValidatingInterceptor;
import okhttp3.OkHttpClient;
import org.apache.http.client.HttpClient;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ResourceCondition;
import org.springframework.boot.autoconfigure.condition.*;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
@ -67,9 +56,17 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean;
import org.springframework.util.CollectionUtils;
import javax.servlet.ServletException;
import javax.sql.DataSource;
import java.util.List;
/**
* {@link EnableAutoConfiguration Auto-configuration} for HAPI FHIR.
*
@ -93,6 +90,7 @@ public class FhirAutoConfiguration {
return fhirContext;
}
@Configuration
@ConditionalOnClass(AbstractJaxRsProvider.class)
@EnableConfigurationProperties(FhirProperties.class)
@ -127,6 +125,15 @@ public class FhirAutoConfiguration {
this.customizers = customizers.getIfAvailable();
}
private void customize() {
if (this.customizers != null) {
AnnotationAwareOrderComparator.sort(this.customizers);
for (FhirRestfulServerCustomizer customizer : this.customizers) {
customizer.customize(this);
}
}
}
@Bean
public ServletRegistrationBean fhirServerRegistrationBean() {
ServletRegistrationBean registration = new ServletRegistrationBean(this, this.properties.getServer().getPath());
@ -147,15 +154,6 @@ public class FhirAutoConfiguration {
customize();
}
private void customize() {
if (this.customizers != null) {
AnnotationAwareOrderComparator.sort(this.customizers);
for (FhirRestfulServerCustomizer customizer : this.customizers) {
customizer.customize(this);
}
}
}
}
@Configuration
@ -164,6 +162,23 @@ public class FhirAutoConfiguration {
@EnableConfigurationProperties(FhirProperties.class)
static class FhirJpaServerConfiguration {
@Bean()
@ConditionalOnMissingBean
public ScheduledExecutorFactoryBean scheduledExecutorService() {
ScheduledExecutorFactoryBean b = new ScheduledExecutorFactoryBean();
b.setPoolSize(5);
return b;
}
@Bean
@ConditionalOnMissingBean
public AsyncTaskExecutor taskScheduler() {
ConcurrentTaskScheduler retVal = new ConcurrentTaskScheduler();
retVal.setConcurrentExecutor(scheduledExecutorService().getObject());
retVal.setScheduledExecutor(scheduledExecutorService().getObject());
return retVal;
}
@Configuration
@EntityScan("ca.uhn.fhir.jpa.entity")
@EnableJpaRepositories(basePackages = "ca.uhn.fhir.jpa.dao.data")
@ -176,6 +191,7 @@ public class FhirAutoConfiguration {
DaoConfig fhirDaoConfig = new DaoConfig();
return fhirDaoConfig;
}
}
@Configuration