Subscription logic cleanup

This commit is contained in:
James Agnew 2018-08-13 08:40:29 -04:00
parent 086b95013e
commit 39ef79ff2c
5 changed files with 30 additions and 23 deletions

View File

@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHandler;
@ -378,7 +377,7 @@ public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> exten
for (IBaseResource resource : resourceList) { for (IBaseResource resource : resourceList) {
String nextId = resource.getIdElement().getIdPart(); String nextId = resource.getIdElement().getIdPart();
allIds.add(nextId); allIds.add(nextId);
mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(resource); mySubscriptionActivatingSubscriber.activateOrRegisterSubscriptionIfRequired(resource);
} }
unregisterAllSubscriptionsNotInCollection(allIds); unregisterAllSubscriptionsNotInCollection(allIds);

View File

@ -70,8 +70,7 @@ public class SubscriptionActivatingSubscriber {
Validate.notNull(theTaskExecutor); Validate.notNull(theTaskExecutor);
} }
public void activateAndRegisterSubscriptionIfRequired(final IBaseResource theSubscription) { public synchronized void activateOrRegisterSubscriptionIfRequired(final IBaseResource theSubscription) {
// Grab the value for "Subscription.channel.type" so we can see if this // Grab the value for "Subscription.channel.type" so we can see if this
// subscriber applies.. // subscriber applies..
String subscriptionChannelType = myCtx String subscriptionChannelType = myCtx
@ -128,21 +127,25 @@ public class SubscriptionActivatingSubscriber {
} else if (activeStatus.equals(statusString)) { } else if (activeStatus.equals(statusString)) {
registerSubscriptionUnlessAlreadyRegistered(theSubscription); registerSubscriptionUnlessAlreadyRegistered(theSubscription);
} else { } else {
if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) { // Status isn't "active" or "requested"
ourLog.info("Removing {} subscription {}", statusString, theSubscription.getIdElement().toUnqualified().getValue()); unregisterSubscriptionIfRegistered(theSubscription, statusString);
mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
} }
} }
protected void unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue());
mySubscriptionInterceptor.unregisterSubscription(theSubscription.getIdElement());
}
} }
private void activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) { private void activateSubscription(String theActiveStatus, final IBaseResource theSubscription, String theRequestedStatus) {
IBaseResource subscription = mySubscriptionDao.read(theSubscription.getIdElement()); IBaseResource subscription = mySubscriptionDao.read(theSubscription.getIdElement());
ourLog.info("Activating and registering subscription {} from status {} to {} for channel {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus, myChannelType); ourLog.info("Activating and subscription {} from status {} to {} for channel {}", subscription.getIdElement().toUnqualified().getValue(), theRequestedStatus, theActiveStatus, myChannelType);
try { try {
SubscriptionUtil.setStatus(myCtx, subscription, theActiveStatus); SubscriptionUtil.setStatus(myCtx, subscription, theActiveStatus);
mySubscriptionDao.update(subscription); mySubscriptionDao.update(subscription);
registerSubscriptionUnlessAlreadyRegistered(subscription);
} catch (final UnprocessableEntityException e) { } catch (final UnprocessableEntityException e) {
ourLog.info("Changing status of {} to ERROR", subscription.getIdElement()); ourLog.info("Changing status of {} to ERROR", subscription.getIdElement());
SubscriptionUtil.setStatus(myCtx, subscription, "error"); SubscriptionUtil.setStatus(myCtx, subscription, "error");
@ -151,6 +154,7 @@ public class SubscriptionActivatingSubscriber {
} }
} }
@SuppressWarnings("EnumSwitchStatementWhichMissesCases")
public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException { public void handleMessage(RestOperationTypeEnum theOperationType, IIdType theId, final IBaseResource theSubscription) throws MessagingException {
switch (theOperationType) { switch (theOperationType) {
@ -162,13 +166,7 @@ public class SubscriptionActivatingSubscriber {
if (!theId.getResourceType().equals("Subscription")) { if (!theId.getResourceType().equals("Subscription")) {
return; return;
} }
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager); activateAndRegisterSubscriptionIfRequiredInTransaction(theSubscription);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
activateAndRegisterSubscriptionIfRequired(theSubscription);
}
});
break; break;
default: default:
break; break;
@ -176,6 +174,16 @@ public class SubscriptionActivatingSubscriber {
} }
private synchronized void activateAndRegisterSubscriptionIfRequiredInTransaction(IBaseResource theSubscription) {
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
activateOrRegisterSubscriptionIfRequired(theSubscription);
}
});
}
private void registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { private void registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) { if (mySubscriptionInterceptor.hasSubscription(theSubscription.getIdElement())) {
ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());