Subscription matcher transactions (#1229)

* fix bug in ResourceIndexSearchParamQuantity
Move transaction boundary around SubscriptionMatcherInterceptor so it can be overridden in subclass
Lots of CRLF changes from a mvn -P DIST

* FIXME
This commit is contained in:
Ken Stevens 2019-03-06 16:12:04 -05:00 committed by James Agnew
parent fd401165a9
commit a2d4c93922
6 changed files with 33 additions and 35 deletions

View File

@ -105,31 +105,15 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType) { private void submitResourceModified(IBaseResource theNewResource, ResourceModifiedMessage.OperationTypeEnum theOperationType) {
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType); ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theNewResource, theOperationType);
// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED
/* if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, msg)) {
* We only want to submit the message to the processing queue once the return;
* transaction is committed. We do this in order to make sure that the
* data is actually in the DB, in case it's the database matcher.
*/
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public int getOrder() {
return 0;
} }
@Override
public void afterCommit() {
submitResourceModified(msg);
}
});
} else {
submitResourceModified(msg); submitResourceModified(msg);
} }
} protected void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
private void sendToProcessingChannel(final ResourceModifiedMessage theMessage) {
ourLog.trace("Sending resource modified message to processing channel"); ourLog.trace("Sending resource modified message to processing channel");
Validate.notNull(myProcessingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it."); Validate.notNull(myProcessingChannel, "A SubscriptionMatcherInterceptor has been registered without calling start() on it.");
myProcessingChannel.send(new ResourceModifiedJsonMessage(theMessage)); myProcessingChannel.send(new ResourceModifiedJsonMessage(theMessage));
@ -144,13 +128,27 @@ public class SubscriptionMatcherInterceptor implements IResourceModifiedConsumer
*/ */
@Override @Override
public void submitResourceModified(final ResourceModifiedMessage theMsg) { public void submitResourceModified(final ResourceModifiedMessage theMsg) {
// Interceptor call: SUBSCRIPTION_RESOURCE_MODIFIED /*
if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MODIFIED, theMsg)) { * We only want to submit the message to the processing queue once the
return; * transaction is committed. We do this in order to make sure that the
* data is actually in the DB, in case it's the database matcher.
*/
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public int getOrder() {
return 0;
} }
@Override
public void afterCommit() {
sendToProcessingChannel(theMsg); sendToProcessingChannel(theMsg);
} }
});
} else {
sendToProcessingChannel(theMsg);
}
}
@VisibleForTesting @VisibleForTesting
LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() { LinkedBlockingQueueSubscribableChannel getProcessingChannelForUnitTest() {

View File

@ -8,7 +8,6 @@ import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage; import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
import ca.uhn.fhir.model.api.TemporalPrecisionEnum; import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
import ca.uhn.fhir.rest.param.*; import ca.uhn.fhir.rest.param.*;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IAnyResource; import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
@ -29,8 +28,8 @@ import static org.junit.Assert.*;
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {TestR4Config.class}) @ContextConfiguration(classes = {TestR4Config.class})
public class InMemorySubscriptionMatcherTestR4 { public class InMemorySubscriptionMatcherR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(InMemorySubscriptionMatcherTestR4.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(InMemorySubscriptionMatcherR4Test.class);
@Autowired @Autowired
InMemorySubscriptionMatcher myInMemorySubscriptionMatcher; InMemorySubscriptionMatcher myInMemorySubscriptionMatcher;

View File

@ -36,6 +36,7 @@ import java.math.BigDecimal;
import java.util.Objects; import java.util.Objects;
import static org.apache.commons.lang3.StringUtils.defaultString; import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.apache.commons.lang3.StringUtils.isBlank;
//@formatter:off //@formatter:off
@Embeddable @Embeddable
@ -246,7 +247,7 @@ public class ResourceIndexedSearchParamQuantity extends BaseResourceIndexedSearc
// Only match on system if it wasn't specified // Only match on system if it wasn't specified
String quantityUnitsString = defaultString(quantity.getUnits()); String quantityUnitsString = defaultString(quantity.getUnits());
if (quantity.getSystem() == null && quantityUnitsString == null) { if (quantity.getSystem() == null && isBlank(quantityUnitsString)) {
if (Objects.equals(getValue(),quantity.getValue())) { if (Objects.equals(getValue(),quantity.getValue())) {
retval = true; retval = true;
} }
@ -257,7 +258,7 @@ public class ResourceIndexedSearchParamQuantity extends BaseResourceIndexedSearc
Objects.equals(getValue(),quantity.getValue())) { Objects.equals(getValue(),quantity.getValue())) {
retval = true; retval = true;
} }
} else if (quantityUnitsString == null) { } else if (isBlank(quantityUnitsString)) {
if (getSystem().equalsIgnoreCase(quantity.getSystem()) && if (getSystem().equalsIgnoreCase(quantity.getSystem()) &&
Objects.equals(getValue(),quantity.getValue())) { Objects.equals(getValue(),quantity.getValue())) {
retval = true; retval = true;

View File

@ -131,7 +131,7 @@ public class InterceptorService implements IInterceptorRegistry, IInterceptorBro
*/ */
private List<HookInvoker> scanInterceptorForHookMethods(Object theInterceptor, int theTypeOrder) { private List<HookInvoker> scanInterceptorForHookMethods(Object theInterceptor, int theTypeOrder) {
ArrayList<HookInvoker> retVal = new ArrayList<>(); ArrayList<HookInvoker> retVal = new ArrayList<>();
for (Method nextMethod : theInterceptor.getClass().getDeclaredMethods()) { for (Method nextMethod : theInterceptor.getClass().getMethods()) {
Hook hook = AnnotationUtils.findAnnotation(nextMethod, Hook.class); Hook hook = AnnotationUtils.findAnnotation(nextMethod, Hook.class);
if (hook != null) { if (hook != null) {

View File

@ -14,7 +14,7 @@ import java.util.Collections;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class InMemorySubscriptionMatcherTestR3 extends BaseSubscriptionDstu3Test { public class InMemorySubscriptionMatcherR3Test extends BaseSubscriptionDstu3Test {
@Autowired @Autowired
SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator; SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator;
@Autowired @Autowired