From 5b7abf15dca072ddd9e57c4e1d70d4356f0f416f Mon Sep 17 00:00:00 2001 From: James Agnew Date: Wed, 11 Jan 2017 18:19:46 -0500 Subject: [PATCH] Properly purge subscriptions --- .../dao/FhirResourceDaoSubscriptionDstu2.java | 44 ++++++++++++------- .../FhirResourceDaoSubscriptionDstu3.java | 26 ++++++++--- hapi-fhir-jpaserver-example/pom.xml | 15 +++++++ .../uhn/fhir/jpa/demo/FhirServerConfig.java | 3 ++ .../uhn/fhirtest/config/TestDstu2Config.java | 1 - src/changes/changes.xml | 4 ++ 6 files changed, 71 insertions(+), 22 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/FhirResourceDaoSubscriptionDstu2.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/FhirResourceDaoSubscriptionDstu2.java index 18ca6c8e8c2..ed596e5cb11 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/FhirResourceDaoSubscriptionDstu2.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/FhirResourceDaoSubscriptionDstu2.java @@ -10,7 +10,7 @@ package ca.uhn.fhir.jpa.dao; * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -66,7 +66,7 @@ import ca.uhn.fhir.rest.server.Constants; import ca.uhn.fhir.rest.server.IBundleProvider; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; -public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2implements IFhirResourceDaoSubscription { +public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2 implements IFhirResourceDaoSubscription { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoSubscriptionDstu2.class); @@ -98,7 +98,7 @@ public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2 getUndeliveredResourcesAndPurge(Long theSubscriptionPid) { List retVal = new ArrayList(); @@ -125,11 +125,16 @@ public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2 subscriptions = mySubscriptionTableDao.findSubscriptionsWhichNeedToBeChecked(SubscriptionStatusEnum.ACTIVE.getCode(), new Date()); - TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - + + Collection subscriptions = txTemplate.execute(new TransactionCallback>() { + @Override + public Collection doInTransaction(TransactionStatus theStatus) { + return mySubscriptionTableDao.findSubscriptionsWhichNeedToBeChecked(SubscriptionStatusEnum.ACTIVE.getCode(), new Date()); + } + }); + int retVal = 0; for (final Long nextSubscriptionTablePid : subscriptions) { retVal += txTemplate.execute(new TransactionCallback() { @@ -140,7 +145,7 @@ public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2 toPurge = mySubscriptionTableDao.findInactiveBeforeCutoff(cutoff); + final Date cutoff = new Date(System.currentTimeMillis() - purgeInactiveAfterMillis); + TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); + txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + + Collection toPurge = txTemplate.execute(new TransactionCallback>() { + @Override + public Collection doInTransaction(TransactionStatus theStatus) { + Collection toPurge = mySubscriptionTableDao.findInactiveBeforeCutoff(cutoff); + toPurge.size(); + return toPurge; + } + }); + for (SubscriptionTable subscriptionTable : toPurge) { final IdDt subscriptionId = subscriptionTable.getSubscriptionResource().getIdDt(); ourLog.info("Deleting inactive subscription {} - Created {}, last client poll {}", new Object[] { subscriptionId.toUnqualified(), subscriptionTable.getCreated(), subscriptionTable.getLastClientPoll() }); - TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); - txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); txTemplate.execute(new TransactionCallback() { @Override public Void doInTransaction(TransactionStatus theStatus) { diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/dstu3/FhirResourceDaoSubscriptionDstu3.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/dstu3/FhirResourceDaoSubscriptionDstu3.java index 5f2c8048d05..a9dd03f87ff 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/dstu3/FhirResourceDaoSubscriptionDstu3.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/dstu3/FhirResourceDaoSubscriptionDstu3.java @@ -130,11 +130,16 @@ public class FhirResourceDaoSubscriptionDstu3 extends FhirResourceDaoDstu3 subscriptions = mySubscriptionTableDao.findSubscriptionsWhichNeedToBeChecked(SubscriptionStatusEnum.ACTIVE.getCode(), new Date()); - TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + Collection subscriptions = txTemplate.execute(new TransactionCallback>() { + @Override + public Collection doInTransaction(TransactionStatus theStatus) { + return mySubscriptionTableDao.findSubscriptionsWhichNeedToBeChecked(SubscriptionStatusEnum.ACTIVE.getCode(), new Date()); + } + }); + int retVal = 0; for (final Long nextSubscriptionTablePid : subscriptions) { retVal += txTemplate.execute(new TransactionCallback() { @@ -244,15 +249,24 @@ public class FhirResourceDaoSubscriptionDstu3 extends FhirResourceDaoDstu3 toPurge = mySubscriptionTableDao.findInactiveBeforeCutoff(cutoff); + TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); + txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + final Date cutoff = new Date(System.currentTimeMillis() - purgeInactiveAfterMillis); + + Collection toPurge = txTemplate.execute(new TransactionCallback>() { + @Override + public Collection doInTransaction(TransactionStatus theStatus) { + Collection toPurge = mySubscriptionTableDao.findInactiveBeforeCutoff(cutoff); + toPurge.size(); + return toPurge; + } + }); + for (SubscriptionTable subscriptionTable : toPurge) { final IdDt subscriptionId = subscriptionTable.getSubscriptionResource().getIdDt(); ourLog.info("Deleting inactive subscription {} - Created {}, last client poll {}", new Object[] { subscriptionId.toUnqualified(), subscriptionTable.getCreated(), subscriptionTable.getLastClientPoll() }); - TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); - txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); txTemplate.execute(new TransactionCallback() { @Override public Void doInTransaction(TransactionStatus theStatus) { diff --git a/hapi-fhir-jpaserver-example/pom.xml b/hapi-fhir-jpaserver-example/pom.xml index 71aac1b565e..44bd920cb91 100644 --- a/hapi-fhir-jpaserver-example/pom.xml +++ b/hapi-fhir-jpaserver-example/pom.xml @@ -131,6 +131,21 @@ jetty-servlet test + + org.eclipse.jetty.websocket + websocket-api + test + + + org.eclipse.jetty.websocket + websocket-client + test + + + org.eclipse.jetty.websocket + websocket-server + test + org.eclipse.jetty jetty-server diff --git a/hapi-fhir-jpaserver-example/src/main/java/ca/uhn/fhir/jpa/demo/FhirServerConfig.java b/hapi-fhir-jpaserver-example/src/main/java/ca/uhn/fhir/jpa/demo/FhirServerConfig.java index a2e06509102..5db5a32247b 100644 --- a/hapi-fhir-jpaserver-example/src/main/java/ca/uhn/fhir/jpa/demo/FhirServerConfig.java +++ b/hapi-fhir-jpaserver-example/src/main/java/ca/uhn/fhir/jpa/demo/FhirServerConfig.java @@ -11,11 +11,13 @@ import org.hibernate.jpa.HibernatePersistenceProvider; import org.springframework.beans.factory.annotation.Autowire; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.transaction.annotation.EnableTransactionManagement; import ca.uhn.fhir.jpa.config.BaseJavaConfigDstu2; +import ca.uhn.fhir.jpa.config.WebsocketDstu2Config; import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.util.SubscriptionsRequireManualActivationInterceptorDstu2; import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor; @@ -24,6 +26,7 @@ import ca.uhn.fhir.rest.server.interceptor.ResponseHighlighterInterceptor; @Configuration @EnableTransactionManagement() +//@Import(WebsocketDstu2Config.class) public class FhirServerConfig extends BaseJavaConfigDstu2 { /** diff --git a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu2Config.java b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu2Config.java index ee0d6d03569..f8616d48121 100644 --- a/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu2Config.java +++ b/hapi-fhir-jpaserver-uhnfhirtest/src/main/java/ca/uhn/fhirtest/config/TestDstu2Config.java @@ -7,7 +7,6 @@ import javax.sql.DataSource; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.lang3.time.DateUtils; -import org.hibernate.dialect.DerbyTenSevenDialect; import org.hibernate.dialect.PostgreSQL94Dialect; import org.hibernate.jpa.HibernatePersistenceProvider; import org.springframework.beans.factory.annotation.Autowire; diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 9f0b7929c52..a8b7f016e28 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -175,6 +175,10 @@ CLI example uploader couldn't find STU3 examples after CI server was moved to build.fhir.org + + Fix issue in JPA subscription module that prevented purging stale + subscriptions when many were present on Postgres +