Properly purge subscriptions

This commit is contained in:
James Agnew 2017-01-11 18:19:46 -05:00
parent fe24841350
commit 5b7abf15dc
6 changed files with 71 additions and 22 deletions

View File

@ -66,7 +66,7 @@ import ca.uhn.fhir.rest.server.Constants;
import ca.uhn.fhir.rest.server.IBundleProvider; import ca.uhn.fhir.rest.server.IBundleProvider;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2<Subscription>implements IFhirResourceDaoSubscription<Subscription> { public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2<Subscription> implements IFhirResourceDaoSubscription<Subscription> {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoSubscriptionDstu2.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoSubscriptionDstu2.class);
@ -125,11 +125,16 @@ public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2<Subsc
// SubscriptionCandidateResource // SubscriptionCandidateResource
Collection<Long> subscriptions = mySubscriptionTableDao.findSubscriptionsWhichNeedToBeChecked(SubscriptionStatusEnum.ACTIVE.getCode(), new Date());
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
Collection<Long> subscriptions = txTemplate.execute(new TransactionCallback<Collection<Long>>() {
@Override
public Collection<Long> doInTransaction(TransactionStatus theStatus) {
return mySubscriptionTableDao.findSubscriptionsWhichNeedToBeChecked(SubscriptionStatusEnum.ACTIVE.getCode(), new Date());
}
});
int retVal = 0; int retVal = 0;
for (final Long nextSubscriptionTablePid : subscriptions) { for (final Long nextSubscriptionTablePid : subscriptions) {
retVal += txTemplate.execute(new TransactionCallback<Integer>() { retVal += txTemplate.execute(new TransactionCallback<Integer>() {
@ -238,15 +243,24 @@ public class FhirResourceDaoSubscriptionDstu2 extends FhirResourceDaoDstu2<Subsc
return; return;
} }
Date cutoff = new Date(System.currentTimeMillis() - purgeInactiveAfterMillis); final Date cutoff = new Date(System.currentTimeMillis() - purgeInactiveAfterMillis);
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
Collection<SubscriptionTable> toPurge = txTemplate.execute(new TransactionCallback<Collection<SubscriptionTable>>() {
@Override
public Collection<SubscriptionTable> doInTransaction(TransactionStatus theStatus) {
Collection<SubscriptionTable> toPurge = mySubscriptionTableDao.findInactiveBeforeCutoff(cutoff); Collection<SubscriptionTable> toPurge = mySubscriptionTableDao.findInactiveBeforeCutoff(cutoff);
toPurge.size();
return toPurge;
}
});
for (SubscriptionTable subscriptionTable : toPurge) { for (SubscriptionTable subscriptionTable : toPurge) {
final IdDt subscriptionId = subscriptionTable.getSubscriptionResource().getIdDt(); final IdDt subscriptionId = subscriptionTable.getSubscriptionResource().getIdDt();
ourLog.info("Deleting inactive subscription {} - Created {}, last client poll {}", ourLog.info("Deleting inactive subscription {} - Created {}, last client poll {}",
new Object[] { subscriptionId.toUnqualified(), subscriptionTable.getCreated(), subscriptionTable.getLastClientPoll() }); new Object[] { subscriptionId.toUnqualified(), subscriptionTable.getCreated(), subscriptionTable.getLastClientPoll() });
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallback<Void>() { txTemplate.execute(new TransactionCallback<Void>() {
@Override @Override
public Void doInTransaction(TransactionStatus theStatus) { public Void doInTransaction(TransactionStatus theStatus) {

View File

@ -130,11 +130,16 @@ public class FhirResourceDaoSubscriptionDstu3 extends FhirResourceDaoDstu3<Subsc
// SubscriptionCandidateResource // SubscriptionCandidateResource
Collection<Long> subscriptions = mySubscriptionTableDao.findSubscriptionsWhichNeedToBeChecked(SubscriptionStatusEnum.ACTIVE.getCode(), new Date());
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
Collection<Long> subscriptions = txTemplate.execute(new TransactionCallback<Collection<Long>>() {
@Override
public Collection<Long> doInTransaction(TransactionStatus theStatus) {
return mySubscriptionTableDao.findSubscriptionsWhichNeedToBeChecked(SubscriptionStatusEnum.ACTIVE.getCode(), new Date());
}
});
int retVal = 0; int retVal = 0;
for (final Long nextSubscriptionTablePid : subscriptions) { for (final Long nextSubscriptionTablePid : subscriptions) {
retVal += txTemplate.execute(new TransactionCallback<Integer>() { retVal += txTemplate.execute(new TransactionCallback<Integer>() {
@ -244,15 +249,24 @@ public class FhirResourceDaoSubscriptionDstu3 extends FhirResourceDaoDstu3<Subsc
return; return;
} }
Date cutoff = new Date(System.currentTimeMillis() - purgeInactiveAfterMillis); TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
final Date cutoff = new Date(System.currentTimeMillis() - purgeInactiveAfterMillis);
Collection<SubscriptionTable> toPurge = txTemplate.execute(new TransactionCallback<Collection<SubscriptionTable>>() {
@Override
public Collection<SubscriptionTable> doInTransaction(TransactionStatus theStatus) {
Collection<SubscriptionTable> toPurge = mySubscriptionTableDao.findInactiveBeforeCutoff(cutoff); Collection<SubscriptionTable> toPurge = mySubscriptionTableDao.findInactiveBeforeCutoff(cutoff);
toPurge.size();
return toPurge;
}
});
for (SubscriptionTable subscriptionTable : toPurge) { for (SubscriptionTable subscriptionTable : toPurge) {
final IdDt subscriptionId = subscriptionTable.getSubscriptionResource().getIdDt(); final IdDt subscriptionId = subscriptionTable.getSubscriptionResource().getIdDt();
ourLog.info("Deleting inactive subscription {} - Created {}, last client poll {}", ourLog.info("Deleting inactive subscription {} - Created {}, last client poll {}",
new Object[] { subscriptionId.toUnqualified(), subscriptionTable.getCreated(), subscriptionTable.getLastClientPoll() }); new Object[] { subscriptionId.toUnqualified(), subscriptionTable.getCreated(), subscriptionTable.getLastClientPoll() });
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallback<Void>() { txTemplate.execute(new TransactionCallback<Void>() {
@Override @Override
public Void doInTransaction(TransactionStatus theStatus) { public Void doInTransaction(TransactionStatus theStatus) {

View File

@ -131,6 +131,21 @@
<artifactId>jetty-servlet</artifactId> <artifactId>jetty-servlet</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.eclipse.jetty</groupId> <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId> <artifactId>jetty-server</artifactId>

View File

@ -11,11 +11,13 @@ import org.hibernate.jpa.HibernatePersistenceProvider;
import org.springframework.beans.factory.annotation.Autowire; import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.EnableTransactionManagement;
import ca.uhn.fhir.jpa.config.BaseJavaConfigDstu2; import ca.uhn.fhir.jpa.config.BaseJavaConfigDstu2;
import ca.uhn.fhir.jpa.config.WebsocketDstu2Config;
import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.util.SubscriptionsRequireManualActivationInterceptorDstu2; import ca.uhn.fhir.jpa.util.SubscriptionsRequireManualActivationInterceptorDstu2;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor; import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
@ -24,6 +26,7 @@ import ca.uhn.fhir.rest.server.interceptor.ResponseHighlighterInterceptor;
@Configuration @Configuration
@EnableTransactionManagement() @EnableTransactionManagement()
//@Import(WebsocketDstu2Config.class)
public class FhirServerConfig extends BaseJavaConfigDstu2 { public class FhirServerConfig extends BaseJavaConfigDstu2 {
/** /**

View File

@ -7,7 +7,6 @@ import javax.sql.DataSource;
import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.hibernate.dialect.DerbyTenSevenDialect;
import org.hibernate.dialect.PostgreSQL94Dialect; import org.hibernate.dialect.PostgreSQL94Dialect;
import org.hibernate.jpa.HibernatePersistenceProvider; import org.hibernate.jpa.HibernatePersistenceProvider;
import org.springframework.beans.factory.annotation.Autowire; import org.springframework.beans.factory.annotation.Autowire;

View File

@ -175,6 +175,10 @@
CLI example uploader couldn't find STU3 examples after CI server CLI example uploader couldn't find STU3 examples after CI server
was moved to build.fhir.org was moved to build.fhir.org
</action> </action>
<action type="fix">
Fix issue in JPA subscription module that prevented purging stale
subscriptions when many were present on Postgres
</action>
</release> </release>
<release version="2.1" date="2016-11-11"> <release version="2.1" date="2016-11-11">
<action type="add"> <action type="add">