From 755060f91e2cfc5eb1801932fd6a0d1b9064a72f Mon Sep 17 00:00:00 2001 From: James Agnew Date: Tue, 16 Oct 2018 15:29:41 -0400 Subject: [PATCH 01/10] Allow subscription triggering on a block of resources --- .../java/ca/uhn/fhir/context/FhirContext.java | 6 +- .../java/ca/uhn/fhir/util/DatatypeUtil.java | 9 +- .../fhir/jpa/dao/BaseHapiFhirResourceDao.java | 19 + .../ca/uhn/fhir/jpa/dao/IFhirResourceDao.java | 10 +- .../SubscriptionTriggeringProvider.java | 353 ++++++++++++++++-- .../search/PersistedJpaBundleProvider.java | 2 +- .../jpa/search/warm/CacheWarmingSvcImpl.java | 19 +- .../java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java | 4 +- .../dstu3/BaseResourceProviderDstu3Test.java | 2 + .../SubscriptionTriggeringDstu3Test.java | 224 ++++++++++- 10 files changed, 599 insertions(+), 49 deletions(-) diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java index 86514b5ae53..467c73d9cc5 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java @@ -442,8 +442,6 @@ public class FhirContext { * * @throws DataFormatException If the resource name is not known */ - // Multiple spots in HAPI FHIR and Smile CDR depend on DataFormatException being - // thrown by this method, don't change that. public RuntimeResourceDefinition getResourceDefinition(String theResourceName) throws DataFormatException { validateInitialized(); Validate.notBlank(theResourceName, "theResourceName must not be blank"); @@ -454,6 +452,10 @@ public class FhirContext { if (retVal == null) { Class clazz = myNameToResourceType.get(resourceName.toLowerCase()); if (clazz == null) { + // *********************************************************************** + // Multiple spots in HAPI FHIR and Smile CDR depend on DataFormatException + // being thrown by this method, don't change that. + // *********************************************************************** throw new DataFormatException(createUnknownResourceNameError(theResourceName, myVersion.getVersion())); } if (IBaseResource.class.isAssignableFrom(clazz)) { diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java index 4c03ac87ded..3624e14956b 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.util; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ package ca.uhn.fhir.util; import org.hl7.fhir.instance.model.api.IPrimitiveType; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -35,7 +36,7 @@ public class DatatypeUtil { HashSet retVal = new HashSet<>(); if (theStringList != null) { for (IPrimitiveType string : theStringList) { - if (string != null && string.getValue()!=null) { + if (string != null && string.getValue() != null) { retVal.add(string.getValueAsString()); } } @@ -44,7 +45,7 @@ public class DatatypeUtil { } /** - * Joins a list of strings with a single space (' ') between each string + * Joins a list of strings with a single space (' ') between each string */ public static String joinStringsSpaceSeparated(List> theStrings) { StringBuilder b = new StringBuilder(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index 73dcf594fe5..d5cc0ce599c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -832,6 +832,25 @@ public abstract class BaseHapiFhirResourceDao extends B return processMatchUrl(theMatchUrl, getResourceType()); } + @Override + public IBaseResource readByPid(Long thePid) { + StopWatch w = new StopWatch(); + + Optional entity = myResourceTableDao.findById(thePid); + if (!entity.isPresent()) { + throw new ResourceNotFoundException("No resource found with PID " + thePid); + } + if (entity.get().getDeleted() != null) { + throw new ResourceGoneException("Resource was deleted at " + new InstantType(entity.get().getDeleted()).getValueAsString()); + } + + T retVal = toResource(myResourceType, entity.get(), null, false); + + ourLog.debug("Processed read on {} in {}ms", thePid, w.getMillis()); + return retVal; + } + + @Override public T read(IIdType theId) { return read(theId, null); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java index 51d7ac83894..99604fbf412 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.dao; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -158,6 +158,11 @@ public interface IFhirResourceDao extends IDao { */ T read(IIdType theId); + /** + * Read a resource by its internal PID + */ + IBaseResource readByPid(Long thePid); + /** * @param theId * @param theRequestDetails TODO @@ -239,6 +244,7 @@ public interface IFhirResourceDao extends IDao { RuntimeResourceDefinition validateCriteriaAndReturnResourceDefinition(String criteria); + // /** // * Invoke the everything operation // */ diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java index 8c29aa2d903..85029c80baa 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.provider; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,64 +21,163 @@ package ca.uhn.fhir.jpa.provider; */ import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; +import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.dao.SearchParameterMap; +import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage; import ca.uhn.fhir.jpa.util.JpaConstants; import ca.uhn.fhir.rest.annotation.IdParam; import ca.uhn.fhir.rest.annotation.Operation; import ca.uhn.fhir.rest.annotation.OperationParam; +import ca.uhn.fhir.rest.api.CacheControlDirective; +import ca.uhn.fhir.rest.api.server.IBundleProvider; +import ca.uhn.fhir.rest.param.StringParam; import ca.uhn.fhir.rest.param.UriParam; import ca.uhn.fhir.rest.server.IResourceProvider; +import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; import ca.uhn.fhir.util.ParametersUtil; +import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.ValidateUtil; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.Validate; +import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.IdType; import org.hl7.fhir.instance.model.api.IBaseParameters; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.scheduling.annotation.Scheduled; -import java.util.List; +import javax.annotation.PostConstruct; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; -public class SubscriptionTriggeringProvider implements IResourceProvider { +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +public class SubscriptionTriggeringProvider implements IResourceProvider, ApplicationContextAware { public static final String RESOURCE_ID = "resourceId"; + public static final int DEFAULT_MAX_SUBMIT = 10000; + public static final String SEARCH_URL = "searchUrl"; + private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class); + private final List myActiveJobs = new ArrayList<>(); @Autowired private FhirContext myFhirContext; @Autowired - private IFhirSystemDao mySystemDao; - @Autowired(required = false) + private DaoRegistry myDaoRegistry; private List> mySubscriptionInterceptorList; + private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT; + @Autowired + private ISearchCoordinatorSvc mySearchCoordinatorSvc; + private ApplicationContext myAppCtx; - @Operation(name= JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + /** + * Sets the maximum number of resources that will be submitted in a single pass + */ + public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) { + Integer maxSubmitPerPass = theMaxSubmitPerPass; + if (maxSubmitPerPass == null) { + maxSubmitPerPass = DEFAULT_MAX_SUBMIT; + } + Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0"); + myMaxSubmitPerPass = maxSubmitPerPass; + } + + @SuppressWarnings("unchecked") + @PostConstruct + public void start() { + mySubscriptionInterceptorList = ObjectUtils.defaultIfNull(mySubscriptionInterceptorList, Collections.emptyList()); + mySubscriptionInterceptorList = new ArrayList<>(); + Collection values1 = myAppCtx.getBeansOfType(BaseSubscriptionInterceptor.class).values(); + Collection> values = (Collection>) values1; + mySubscriptionInterceptorList.addAll(values); + } + + @Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + public IBaseParameters triggerSubscription( + @OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List theResourceIds, + @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List theSearchUrls + ) { + return doTriggerSubscription(theResourceIds, theSearchUrls, null); + } + + @Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) public IBaseParameters triggerSubscription( @IdParam IIdType theSubscriptionId, - @OperationParam(name= RESOURCE_ID) UriParam theResourceId) { + @OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List theResourceIds, + @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List theSearchUrls + ) { - ValidateUtil.isTrueOrThrowInvalidRequest(theResourceId != null, RESOURCE_ID + " parameter not provided"); - IdType resourceId = new IdType(theResourceId.getValue()); - ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type"); - ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part"); + // Throw a 404 if the subscription doesn't exist + IFhirResourceDao subscriptionDao = myDaoRegistry.getResourceDao("Subscription"); + IIdType subscriptionId = theSubscriptionId; + if (subscriptionId.hasResourceType() == false) { + subscriptionId = subscriptionId.withResourceType("Subscription"); + } + subscriptionDao.read(subscriptionId); - Class resourceType = myFhirContext.getResourceDefinition(resourceId.getResourceType()).getImplementingClass(); - IFhirResourceDao dao = mySystemDao.getDao(resourceType); - IBaseResource resourceToTrigger = dao.read(resourceId); + return doTriggerSubscription(theResourceIds, theSearchUrls, subscriptionId); - ResourceModifiedMessage msg = new ResourceModifiedMessage(); - msg.setId(resourceToTrigger.getIdElement()); - msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE); - msg.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue()); - msg.setNewPayload(myFhirContext, resourceToTrigger); + } - for (BaseSubscriptionInterceptor next :mySubscriptionInterceptorList) { - next.submitResourceModified(msg); + private IBaseParameters doTriggerSubscription(@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List theResourceIds, @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List theSearchUrls, @IdParam IIdType theSubscriptionId) { + if (mySubscriptionInterceptorList.isEmpty()) { + throw new PreconditionFailedException("Subscription processing not active on this server"); } + List resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList()); + List searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList()); + + // Make sure we have at least one resource ID or search URL + if (resourceIds.size() == 0 && searchUrls.size() == 0) { + throw new InvalidRequestException("No resource IDs or search URLs specified for triggering"); + } + + // Resource URLs must be compete + for (UriParam next : resourceIds) { + IdType resourceId = new IdType(next.getValue()); + ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type"); + ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part"); + } + + // Search URLs must be valid + for (StringParam next : searchUrls) { + if (next.getValue().contains("?") == false) { + throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")"); + } + } + + SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails(); + jobDetails.setJobId(UUID.randomUUID().toString()); + jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList())); + jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList())); + if (theSubscriptionId != null) { + jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue()); + } + + // Submit job for processing + synchronized (myActiveJobs) { + myActiveJobs.add(jobDetails); + } + + // Create a parameters response IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext); IPrimitiveType value = (IPrimitiveType) myFhirContext.getElementDefinition("string").newInstance(); - value.setValueAsString("Triggered resource " + theResourceId.getValue() + " for subscription"); + value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId); ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value); return retVal; } @@ -87,4 +186,210 @@ public class SubscriptionTriggeringProvider implements IResourceProvider { public Class getResourceType() { return myFhirContext.getResourceDefinition("Subscription").getImplementingClass(); } + + @Scheduled(fixedDelay = DateUtils.MILLIS_PER_SECOND) + public void runDeliveryPass() { + + synchronized (myActiveJobs) { + if (myActiveJobs.isEmpty()) { + return; + } + + SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0); + + runJob(activeJob); + + // If the job is complete, remove it from the queue + if (activeJob.getRemainingResourceIds().isEmpty()) { + if (activeJob.getRemainingSearchUrls().isEmpty()) { + if (isBlank(activeJob.myCurrentSearchUuid)) { + myActiveJobs.remove(0); + String remainingJobsMsg = ""; + if (myActiveJobs.size() > 0) { + remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)"; + } + ourLog.info("Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg); + } + } + } + + } + + } + + private void runJob(SubscriptionTriggeringJobDetails theJobDetails) { + StopWatch sw = new StopWatch(); + + // Submit individual resources + int totalSubmitted = 0; + while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) { + totalSubmitted++; + String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0); + submitResource(theJobDetails.getSubscriptionId(), nextResourceId); + } + + // If we don't have an active search started, and one needs to be.. start it + if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) { + String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0); + RuntimeResourceDefinition resourceDef = CacheWarmingSvcImpl.parseUrlResourceType(myFhirContext, nextSearchUrl); + String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?')); + String resourceType = resourceDef.getName(); + + IFhirResourceDao callingDao = myDaoRegistry.getResourceDao(resourceType); + SearchParameterMap params = BaseHapiFhirDao.translateMatchUrl(callingDao, myFhirContext, queryPart, resourceDef); + + ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl); + + IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective()); + theJobDetails.setCurrentSearchUuid(search.getUuid()); + theJobDetails.setCurrentSearchResourceType(resourceType); + theJobDetails.setCurrentSearchCount(params.getCount()); + } + + // If we have an active search going, submit resources from it + if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) { + int fromIndex = 0; + if (theJobDetails.getCurrentSearchLastUploadedIndex() != null) { + fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; + } + + IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); + + int maxQuerySize = myMaxSubmitPerPass - totalSubmitted; + int toIndex = fromIndex + maxQuerySize; + if (theJobDetails.getCurrentSearchCount() != null) { + toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); + } + ourLog.info("Triggering job[{}] submitting up to {} resources for search {}", theJobDetails.getJobId(), maxQuerySize, theJobDetails.getCurrentSearchUuid()); + List resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); + for (Long next : resourceIds) { + IBaseResource nextResource = resourceDao.readByPid(next); + submitResource(theJobDetails.getSubscriptionId(), nextResource); + totalSubmitted++; + theJobDetails.setCurrentSearchLastUploadedIndex(toIndex - 1); + } + + int expectedCount = toIndex - fromIndex; + if (resourceIds.size() < expectedCount || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { + ourLog.info("Triggering job[{}] search {} has completed", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid()); + theJobDetails.setCurrentSearchResourceType(null); + theJobDetails.setCurrentSearchUuid(null); + theJobDetails.setCurrentSearchLastUploadedIndex(null); + theJobDetails.setCurrentSearchCount(null); + } + } + + ourLog.info("Subscription trigger job[{}] triggered {} resources in {} ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); + } + + private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) { + org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType()); + IBaseResource resourceToTrigger = dao.read(resourceId); + + submitResource(theSubscriptionId, resourceToTrigger); + } + + private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) { + + ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId); + + ResourceModifiedMessage msg = new ResourceModifiedMessage(); + msg.setId(theResourceToTrigger.getIdElement()); + msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE); + msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue()); + msg.setNewPayload(myFhirContext, theResourceToTrigger); + + for (BaseSubscriptionInterceptor next : mySubscriptionInterceptorList) { + next.submitResourceModified(msg); + } + } + + public void cancelAll() { + synchronized (myActiveJobs) { + myActiveJobs.clear(); + } + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + myAppCtx = applicationContext; + } + + private static class SubscriptionTriggeringJobDetails { + + private String myJobId; + private String mySubscriptionId; + private List myRemainingResourceIds; + private List myRemainingSearchUrls; + private String myCurrentSearchUuid; + private Integer myCurrentSearchCount; + private String myCurrentSearchResourceType; + private Integer myCurrentSearchLastUploadedIndex; + + public Integer getCurrentSearchCount() { + return myCurrentSearchCount; + } + + public void setCurrentSearchCount(Integer theCurrentSearchCount) { + myCurrentSearchCount = theCurrentSearchCount; + } + + public String getCurrentSearchResourceType() { + return myCurrentSearchResourceType; + } + + public void setCurrentSearchResourceType(String theCurrentSearchResourceType) { + myCurrentSearchResourceType = theCurrentSearchResourceType; + } + + public String getJobId() { + return myJobId; + } + + public void setJobId(String theJobId) { + myJobId = theJobId; + } + + public String getSubscriptionId() { + return mySubscriptionId; + } + + public void setSubscriptionId(String theSubscriptionId) { + mySubscriptionId = theSubscriptionId; + } + + public List getRemainingResourceIds() { + return myRemainingResourceIds; + } + + public void setRemainingResourceIds(List theRemainingResourceIds) { + myRemainingResourceIds = theRemainingResourceIds; + } + + public List getRemainingSearchUrls() { + return myRemainingSearchUrls; + } + + public void setRemainingSearchUrls(List theRemainingSearchUrls) { + myRemainingSearchUrls = theRemainingSearchUrls; + } + + public String getCurrentSearchUuid() { + return myCurrentSearchUuid; + } + + public void setCurrentSearchUuid(String theCurrentSearchUuid) { + myCurrentSearchUuid = theCurrentSearchUuid; + } + + public Integer getCurrentSearchLastUploadedIndex() { + return myCurrentSearchLastUploadedIndex; + } + + public void setCurrentSearchLastUploadedIndex(Integer theCurrentSearchLastUploadedIndex) { + myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex; + } + } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java index 330c7672791..9a34dde5ecb 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java @@ -117,7 +117,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider { results = query.getResultList(); - ArrayList retVal = new ArrayList(); + ArrayList retVal = new ArrayList<>(); for (ResourceHistoryTable next : results) { BaseHasResource resource; resource = next; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java index 667c87a3016..bfca9c536e4 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.context.ConfigurationException; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.jpa.dao.*; +import ca.uhn.fhir.parser.DataFormatException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -68,7 +69,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { private void refreshNow(WarmCacheEntry theCacheEntry) { String nextUrl = theCacheEntry.getUrl(); - RuntimeResourceDefinition resourceDef = parseWarmUrlResourceType(nextUrl); + RuntimeResourceDefinition resourceDef = parseUrlResourceType(myCtx, nextUrl); IFhirResourceDao callingDao = myDaoRegistry.getResourceDao(resourceDef.getName()); String queryPart = parseWarmUrlParamPart(nextUrl); SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(callingDao, myCtx, queryPart, resourceDef); @@ -84,14 +85,18 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { return theNextUrl.substring(paramIndex); } - private RuntimeResourceDefinition parseWarmUrlResourceType(String theNextUrl) { - int paramIndex = theNextUrl.indexOf('?'); - String resourceName = theNextUrl.substring(0, paramIndex); + /** + * TODO: this method probably belongs in a utility class, not here + * + * @throws DataFormatException If the resource type is not known + */ + public static RuntimeResourceDefinition parseUrlResourceType(FhirContext theCtx, String theUrl) throws DataFormatException { + int paramIndex = theUrl.indexOf('?'); + String resourceName = theUrl.substring(0, paramIndex); if (resourceName.contains("/")) { resourceName = resourceName.substring(resourceName.lastIndexOf('/') + 1); } - RuntimeResourceDefinition resourceDef = myCtx.getResourceDefinition(resourceName); - return resourceDef; + return theCtx.getResourceDefinition(resourceName); } @PostConstruct @@ -107,7 +112,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { // Validate parseWarmUrlParamPart(next.getUrl()); - parseWarmUrlResourceType(next.getUrl()); + parseUrlResourceType(myCtx, next.getUrl()); myCacheEntryToNextRefresh.put(next, 0L); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java index 65da5f5820d..6ec7358cd29 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java @@ -401,14 +401,14 @@ public abstract class BaseJpaTest { public static void waitForSize(int theTarget, List theList) { StopWatch sw = new StopWatch(); - while (theList.size() != theTarget && sw.getMillis() <= 15000) { + while (theList.size() != theTarget && sw.getMillis() <= 16000) { try { Thread.sleep(50); } catch (InterruptedException theE) { throw new Error(theE); } } - if (sw.getMillis() >= 15000) { + if (sw.getMillis() >= 16000) { String describeResults = theList .stream() .map(t -> { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java index fdec1636947..32c1648a882 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java @@ -66,6 +66,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test { protected static ISearchCoordinatorSvc mySearchCoordinatorSvc; private static Server ourServer; private TerminologyUploaderProviderDstu3 myTerminologyUploaderProvider; + protected static SubscriptionTriggeringProvider ourSubscriptionTriggeringProvider; public BaseResourceProviderDstu3Test() { super(); @@ -160,6 +161,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test { ourRestHookSubscriptionInterceptor = wac.getBean(SubscriptionRestHookInterceptor.class); ourEmailSubscriptionInterceptor = wac.getBean(SubscriptionEmailInterceptor.class); ourSearchParamRegistry = wac.getBean(SearchParamRegistryDstu3.class); + ourSubscriptionTriggeringProvider = wac.getBean(SubscriptionTriggeringProvider.class); myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000); ourClient = myFhirCtx.newRestfulGenericClient(ourServerBase); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java index 561cd231988..252e9dbee0b 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java @@ -13,6 +13,7 @@ import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor; import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.RestfulServer; +import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.util.PortUtil; import com.google.common.collect.Lists; import org.eclipse.jetty.server.Server; @@ -27,7 +28,10 @@ import javax.servlet.http.HttpServletRequest; import java.util.ArrayList; import java.util.List; +import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** * Test the rest-hook subscriptions @@ -36,12 +40,14 @@ import static org.junit.Assert.assertEquals; public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Test { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionTriggeringDstu3Test.class); - private static List ourCreatedObservations = Lists.newArrayList(); private static int ourListenerPort; private static RestfulServer ourListenerRestServer; private static Server ourListenerServer; private static String ourListenerServerBase; + private static List ourCreatedObservations = Lists.newArrayList(); private static List ourUpdatedObservations = Lists.newArrayList(); + private static List ourCreatedPatients = Lists.newArrayList(); + private static List ourUpdatedPatients = Lists.newArrayList(); private static List ourContentTypes = new ArrayList<>(); private List mySubscriptionIds = new ArrayList<>(); @@ -63,6 +69,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor); + ourSubscriptionTriggeringProvider.cancelAll(); + ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null); } @Before @@ -70,10 +78,15 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor); } + /** + * Only do counter resets here! We call this inside tests + */ @Before public void beforeReset() { ourCreatedObservations.clear(); ourUpdatedObservations.clear(); + ourCreatedPatients.clear(); + ourUpdatedPatients.clear(); ourContentTypes.clear(); } @@ -90,7 +103,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te subscription.setChannel(channel); MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); - subscription.setId(methodOutcome.getId().getIdPart()); + subscription.setId(methodOutcome.getId()); mySubscriptionIds.add(methodOutcome.getId()); waitForQueueToDrain(); @@ -116,6 +129,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te return observation; } + @Test public void testTriggerResourceToSpecificSubscription() throws Exception { String payload = "application/fhir+json"; @@ -145,7 +159,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te .execute(); String responseValue = response.getParameter().get(0).getValue().primitiveValue(); - assertEquals("Triggered resource " + obsId.getValue() + " for subscription", responseValue); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); waitForQueueToDrain(); waitForSize(0, ourCreatedObservations); @@ -153,6 +167,173 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te } + @Test + public void testTriggerUsingMultipleSearches() throws Exception { + String payload = "application/fhir+json"; + IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); + IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); + + // Create lots + for (int i = 0; i < 50; i++) { + Patient p = new Patient(); + p.addName().setFamily("P" + i); + ourClient.create().resource(p).execute(); + } + for (int i = 0; i < 50; i++) { + Observation o = new Observation(); + o.setId("O" + i); + o.setStatus(Observation.ObservationStatus.FINAL); + o.getCode().setText("O" + i); + ourClient.update().resource(o).execute(); + } + + waitForSize(50, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(50, ourUpdatedPatients); + beforeReset(); + + ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); + + ourClient.registerInterceptor(new LoggingInterceptor(true)); + + Parameters response = ourClient + .operation() + .onInstance(sub1id) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?")) + .andParameter(SubscriptionTriggeringProvider.RESOURCE_ID, new UriType("Observation/O2")) + .execute(); + String responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + response = ourClient + .operation() + .onInstance(sub2id) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Patient?")) + .execute(); + responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + waitForSize(51, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(50, ourUpdatedPatients); + + } + + @Test + public void testTriggerUsingSearchesWithCount() throws Exception { + String payload = "application/fhir+json"; + IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); + IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); + + // Create lots + for (int i = 0; i < 50; i++) { + Patient p = new Patient(); + p.addName().setFamily("P" + i); + ourClient.create().resource(p).execute(); + } + for (int i = 0; i < 50; i++) { + Observation o = new Observation(); + o.setId("O" + i); + o.setStatus(Observation.ObservationStatus.FINAL); + o.getCode().setText("O" + i); + ourClient.update().resource(o).execute(); + } + + waitForSize(50, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(50, ourUpdatedPatients); + beforeReset(); + + ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); + + ourClient.registerInterceptor(new LoggingInterceptor(true)); + + Parameters response = ourClient + .operation() + .onInstance(sub1id) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?_count=10")) + .execute(); + String responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + response = ourClient + .operation() + .onInstance(sub2id) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Patient?_count=16")) + .execute(); + responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + waitForSize(10, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(16, ourUpdatedPatients); + + } + + @Test + public void testTriggerUsingInvalidSearchUrl() { + + try { + ourClient + .operation() + .onType(Subscription.class) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation")) + .execute(); + fail(); + } catch (InvalidRequestException e) { + assertEquals("HTTP 400 Bad Request: Search URL is not valid (must be in the form \"[resource type]?[optional params]\")", e.getMessage()); + } + } + + @Test + public void testTriggerAllSubscriptions() throws Exception { + String payload = "application/fhir+json"; + IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); + IdType sub2id = createSubscription("Observation?status=final", payload, ourListenerServerBase).getIdElement(); + + for (int i = 0; i < 50; i++) { + Observation o = new Observation(); + o.setId("O" + i); + o.setStatus(Observation.ObservationStatus.FINAL); + o.getCode().setText("O" + i); + ourClient.update().resource(o).execute(); + } + + waitForSize(100, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(0, ourUpdatedPatients); + beforeReset(); + + ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); + + ourClient.registerInterceptor(new LoggingInterceptor(true)); + + Parameters response = ourClient + .operation() + .onType(Subscription.class) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?")) + .execute(); + String responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + waitForSize(100, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(0, ourUpdatedPatients); + + } + @Test public void testTriggerResourceToSpecificSubscriptionWhichDoesntMatch() throws Exception { String payload = "application/fhir+json"; @@ -172,8 +353,6 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te waitForSize(1, ourUpdatedObservations); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); - ourClient.registerInterceptor(new LoggingInterceptor(true)); - Parameters response = ourClient .operation() .onInstance(subscriptionId) @@ -182,7 +361,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te .execute(); String responseValue = response.getParameter().get(0).getValue().primitiveValue(); - assertEquals("Triggered resource " + obsId.getValue() + " for subscription", responseValue); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); waitForQueueToDrain(); waitForSize(0, ourCreatedObservations); @@ -191,6 +370,11 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te } + @Override + protected boolean shouldLogClient() { + return false; + } + private void waitForQueueToDrain() throws InterruptedException { RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor); } @@ -220,6 +404,31 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te } + public static class PatientListener implements IResourceProvider { + + @Create + public MethodOutcome create(@ResourceParam Patient thePatient, HttpServletRequest theRequest) { + ourLog.info("Received Listener Create"); + ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); + ourCreatedPatients.add(thePatient); + return new MethodOutcome(new IdType("Patient/1"), true); + } + + @Override + public Class getResourceType() { + return Patient.class; + } + + @Update + public MethodOutcome update(@ResourceParam Patient thePatient, HttpServletRequest theRequest) { + ourUpdatedPatients.add(thePatient); + ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); + ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedPatients.size()); + return new MethodOutcome(new IdType("Patient/1"), false); + } + + } + @BeforeClass public static void startListenerServer() throws Exception { ourListenerPort = PortUtil.findFreePort(); @@ -227,7 +436,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; ObservationListener obsListener = new ObservationListener(); - ourListenerRestServer.setResourceProviders(obsListener); + PatientListener ptListener = new PatientListener(); + ourListenerRestServer.setResourceProviders(obsListener, ptListener); ourListenerServer = new Server(ourListenerPort); From 0fbe897ca42085e1093ec865916ef32509f1a08c Mon Sep 17 00:00:00 2001 From: James Agnew Date: Tue, 16 Oct 2018 17:24:17 -0400 Subject: [PATCH 02/10] Add named beans to JPA config --- .../src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java index 0a7b14be309..fee87d32661 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/BaseConfig.java @@ -70,7 +70,7 @@ public abstract class BaseConfig implements SchedulingConfigurer { @Autowired protected Environment myEnv; - @Bean + @Bean(name = "myDaoRegistry") public DaoRegistry daoRegistry() { return new DaoRegistry(); } @@ -141,13 +141,13 @@ public abstract class BaseConfig implements SchedulingConfigurer { return b.getObject(); } - @Bean + @Bean(name="mySubscriptionTriggeringProvider") @Lazy - public SubscriptionTriggeringProvider mySubscriptionTriggeringProvider() { + public SubscriptionTriggeringProvider subscriptionTriggeringProvider() { return new SubscriptionTriggeringProvider(); } - @Bean(autowire = Autowire.BY_TYPE) + @Bean(autowire = Autowire.BY_TYPE, name = "mySearchCoordinatorSvc") public ISearchCoordinatorSvc searchCoordinatorSvc() { return new SearchCoordinatorSvcImpl(); } From 4e252063b81bc8e4eda69149d9497127d90516a8 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Tue, 16 Oct 2018 19:54:55 -0400 Subject: [PATCH 03/10] Allow transactions and batches with transaction permission in AuthorizationInterceptor --- .../interceptor/auth/IAuthRuleBuilderRule.java | 3 ++- .../rest/server/interceptor/auth/RuleBuilder.java | 12 ++++++++++-- .../auth/AuthorizationInterceptorDstu2Test.java | 2 +- .../AuthorizationInterceptorDstu3Test.java | 2 +- .../interceptor/AuthorizationInterceptorR4Test.java | 4 ++-- src/changes/changes.xml | 5 +++++ 6 files changed, 21 insertions(+), 7 deletions(-) diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/IAuthRuleBuilderRule.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/IAuthRuleBuilderRule.java index 80e68a7725d..4f68b723377 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/IAuthRuleBuilderRule.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/IAuthRuleBuilderRule.java @@ -83,7 +83,8 @@ public interface IAuthRuleBuilderRule { /** * This rule applies to the FHIR transaction operation. Transaction is a special - * case in that it bundles other operations + * case in that it bundles other operations. This permission also allows FHIR + * batch to be performed. */ IAuthRuleBuilderRuleTransaction transaction(); diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/RuleBuilder.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/RuleBuilder.java index bc0ead08d80..058a85e0c12 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/RuleBuilder.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/interceptor/auth/RuleBuilder.java @@ -223,7 +223,6 @@ public class RuleBuilder implements IAuthRuleBuilder { @Override public IAuthRuleBuilderRuleTransaction transaction() { - myRuleOp = RuleOpEnum.TRANSACTION; return new RuleBuilderRuleTransaction(); } @@ -520,11 +519,20 @@ public class RuleBuilder implements IAuthRuleBuilder { @Override public IAuthRuleBuilderRuleOpClassifierFinished andApplyNormalRules() { + // Allow transaction RuleImplOp rule = new RuleImplOp(myRuleName); rule.setMode(myRuleMode); - rule.setOp(myRuleOp); + rule.setOp(RuleOpEnum.TRANSACTION); rule.setTransactionAppliesToOp(TransactionAppliesToEnum.ANY_OPERATION); myRules.add(rule); + + // Allow batch + rule = new RuleImplOp(myRuleName); + rule.setMode(myRuleMode); + rule.setOp(RuleOpEnum.BATCH); + rule.setTransactionAppliesToOp(TransactionAppliesToEnum.ANY_OPERATION); + myRules.add(rule); + return new RuleBuilderFinished(rule); } diff --git a/hapi-fhir-structures-dstu2/src/test/java/ca/uhn/fhir/rest/server/interceptor/auth/AuthorizationInterceptorDstu2Test.java b/hapi-fhir-structures-dstu2/src/test/java/ca/uhn/fhir/rest/server/interceptor/auth/AuthorizationInterceptorDstu2Test.java index 3fd96f88573..fe533d76b7c 100644 --- a/hapi-fhir-structures-dstu2/src/test/java/ca/uhn/fhir/rest/server/interceptor/auth/AuthorizationInterceptorDstu2Test.java +++ b/hapi-fhir-structures-dstu2/src/test/java/ca/uhn/fhir/rest/server/interceptor/auth/AuthorizationInterceptorDstu2Test.java @@ -262,7 +262,7 @@ public class AuthorizationInterceptorDstu2Test { httpPost.setEntity(createFhirResourceEntity(input)); status = ourClient.execute(httpPost); extractResponseAndClose(status); - assertEquals(403, status.getStatusLine().getStatusCode()); + assertEquals(200, status.getStatusLine().getStatusCode()); } @Test diff --git a/hapi-fhir-structures-dstu3/src/test/java/ca/uhn/fhir/rest/server/interceptor/AuthorizationInterceptorDstu3Test.java b/hapi-fhir-structures-dstu3/src/test/java/ca/uhn/fhir/rest/server/interceptor/AuthorizationInterceptorDstu3Test.java index 217204e8489..e985aa452b4 100644 --- a/hapi-fhir-structures-dstu3/src/test/java/ca/uhn/fhir/rest/server/interceptor/AuthorizationInterceptorDstu3Test.java +++ b/hapi-fhir-structures-dstu3/src/test/java/ca/uhn/fhir/rest/server/interceptor/AuthorizationInterceptorDstu3Test.java @@ -495,7 +495,7 @@ public class AuthorizationInterceptorDstu3Test { httpPost.setEntity(createFhirResourceEntity(input)); status = ourClient.execute(httpPost); extractResponseAndClose(status); - assertEquals(403, status.getStatusLine().getStatusCode()); + assertEquals(200, status.getStatusLine().getStatusCode()); } @Test diff --git a/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/server/interceptor/AuthorizationInterceptorR4Test.java b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/server/interceptor/AuthorizationInterceptorR4Test.java index 34cce97509d..d1fcb352631 100644 --- a/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/server/interceptor/AuthorizationInterceptorR4Test.java +++ b/hapi-fhir-structures-r4/src/test/java/ca/uhn/fhir/rest/server/interceptor/AuthorizationInterceptorR4Test.java @@ -469,7 +469,7 @@ public class AuthorizationInterceptorR4Test { } @Test - public void testBatchWhenOnlyTransactionAllowed() throws Exception { + public void testBatchAllowed() throws Exception { ourServlet.registerInterceptor(new AuthorizationInterceptor(PolicyEnum.DENY) { @Override public List buildRuleList(RequestDetails theRequestDetails) { @@ -498,7 +498,7 @@ public class AuthorizationInterceptorR4Test { httpPost.setEntity(createFhirResourceEntity(input)); status = ourClient.execute(httpPost); extractResponseAndClose(status); - assertEquals(403, status.getStatusLine().getStatusCode()); + assertEquals(200, status.getStatusLine().getStatusCode()); } @Test diff --git a/src/changes/changes.xml b/src/changes/changes.xml index ce8791e6c1c..e06eb146ba0 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -95,6 +95,11 @@ When using the testpage overlay to delete a resource, currently a crash can occur if an unqualified ID is placed in the ID text box. This has been corrected. + + AuthorizationInterceptor did not allow FHIR batch operations when the transaction() + permission is granted. This has been corrected so that transaction() allows both + batch and transaction requests to proceed. + From 1c1f601332b94efea43409f2abd51eb5b5f6d92c Mon Sep 17 00:00:00 2001 From: jamesagnew Date: Tue, 16 Oct 2018 19:56:52 -0400 Subject: [PATCH 04/10] License header updates --- .../src/main/java/ca/uhn/fhir/util/DatatypeUtil.java | 4 ++-- .../src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java | 4 ++-- .../uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java index 3624e14956b..528b32d720a 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.util; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java index 99604fbf412..d0eb43b7dd2 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.dao; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java index 85029c80baa..5b1aa53129f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.provider; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. From 5d7142a972a0b38de521febd697e7baab92c28a9 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Tue, 16 Oct 2018 20:23:32 -0400 Subject: [PATCH 05/10] Avoid intermittent test failures --- .../fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java index 252e9dbee0b..966c7263760 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java @@ -314,7 +314,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te waitForSize(0, ourUpdatedPatients); beforeReset(); - ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); + ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(50); ourClient.registerInterceptor(new LoggingInterceptor(true)); From 388bda1fecc15f332fe949cd6716ea74b3f277f3 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Tue, 16 Oct 2018 20:31:43 -0400 Subject: [PATCH 06/10] Avoid transaction isolation level not supported by Oracle --- .../java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java | 2 +- .../java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java index 9a34dde5ecb..8df28bcc5ac 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java @@ -157,7 +157,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider { TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager); txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); - txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_UNCOMMITTED); + txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); return txTemplate.execute(s -> { try { setSearchEntity(mySearchDao.findByUuid(myUuid)); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java index 020e8ce35f8..6071163c34f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/SearchCoordinatorSvcImpl.java @@ -633,7 +633,7 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc { txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); if (myCustomIsolationSupported) { - txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_UNCOMMITTED); + txTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); } txTemplate.execute(new TransactionCallbackWithoutResult() { From 2c05d9c5dbb847753fd5f2a002aa5c55ad4ec6aa Mon Sep 17 00:00:00 2001 From: James Agnew Date: Tue, 16 Oct 2018 21:51:33 -0400 Subject: [PATCH 07/10] Test cleanup --- .../jpa/subscription/SubscriptionTriggeringDstu3Test.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java index 966c7263760..b426cc342fd 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java @@ -300,7 +300,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); IdType sub2id = createSubscription("Observation?status=final", payload, ourListenerServerBase).getIdElement(); - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 10; i++) { Observation o = new Observation(); o.setId("O" + i); o.setStatus(Observation.ObservationStatus.FINAL); @@ -308,7 +308,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourClient.update().resource(o).execute(); } - waitForSize(100, ourUpdatedObservations); + waitForSize(20, ourUpdatedObservations); waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedPatients); waitForSize(0, ourUpdatedPatients); @@ -327,7 +327,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te String responseValue = response.getParameter().get(0).getValue().primitiveValue(); assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); - waitForSize(100, ourUpdatedObservations); + waitForSize(20, ourUpdatedObservations); waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedPatients); waitForSize(0, ourUpdatedPatients); From 8130700d686ae9c8f9aa3b6f890df06930beef05 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Wed, 17 Oct 2018 05:53:07 -0400 Subject: [PATCH 08/10] One more optimization to the subscription retriggering logic --- .../SubscriptionTriggeringProvider.java | 31 +++++++++++-------- .../SubscriptionTriggeringDstu3Test.java | 6 ++++ 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java index 85029c80baa..7efa6b6dadf 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java @@ -195,6 +195,9 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic return; } + String activeJobIds = myActiveJobs.stream().map(t->t.getJobId()).collect(Collectors.joining(", ")); + ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds); + SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0); runJob(activeJob); @@ -219,6 +222,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic private void runJob(SubscriptionTriggeringJobDetails theJobDetails) { StopWatch sw = new StopWatch(); + ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId()); // Submit individual resources int totalSubmitted = 0; @@ -244,14 +248,13 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic theJobDetails.setCurrentSearchUuid(search.getUuid()); theJobDetails.setCurrentSearchResourceType(resourceType); theJobDetails.setCurrentSearchCount(params.getCount()); + theJobDetails.setCurrentSearchCount(null); + theJobDetails.setCurrentSearchLastUploadedIndex(-1); } // If we have an active search going, submit resources from it if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) { - int fromIndex = 0; - if (theJobDetails.getCurrentSearchLastUploadedIndex() != null) { - fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; - } + int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); @@ -260,26 +263,28 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic if (theJobDetails.getCurrentSearchCount() != null) { toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); } - ourLog.info("Triggering job[{}] submitting up to {} resources for search {}", theJobDetails.getJobId(), maxQuerySize, theJobDetails.getCurrentSearchUuid()); + ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); List resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); + + ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); for (Long next : resourceIds) { IBaseResource nextResource = resourceDao.readByPid(next); submitResource(theJobDetails.getSubscriptionId(), nextResource); totalSubmitted++; - theJobDetails.setCurrentSearchLastUploadedIndex(toIndex - 1); + theJobDetails.setCurrentSearchLastUploadedIndex(theJobDetails.getCurrentSearchLastUploadedIndex()+1); } int expectedCount = toIndex - fromIndex; - if (resourceIds.size() < expectedCount || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { - ourLog.info("Triggering job[{}] search {} has completed", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid()); + if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { + ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid()); theJobDetails.setCurrentSearchResourceType(null); theJobDetails.setCurrentSearchUuid(null); - theJobDetails.setCurrentSearchLastUploadedIndex(null); + theJobDetails.setCurrentSearchLastUploadedIndex(-1); theJobDetails.setCurrentSearchCount(null); } } - ourLog.info("Subscription trigger job[{}] triggered {} resources in {} ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); + ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); } private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) { @@ -325,7 +330,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic private String myCurrentSearchUuid; private Integer myCurrentSearchCount; private String myCurrentSearchResourceType; - private Integer myCurrentSearchLastUploadedIndex; + private int myCurrentSearchLastUploadedIndex; public Integer getCurrentSearchCount() { return myCurrentSearchCount; @@ -383,11 +388,11 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic myCurrentSearchUuid = theCurrentSearchUuid; } - public Integer getCurrentSearchLastUploadedIndex() { + public int getCurrentSearchLastUploadedIndex() { return myCurrentSearchLastUploadedIndex; } - public void setCurrentSearchLastUploadedIndex(Integer theCurrentSearchLastUploadedIndex) { + public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) { myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex; } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java index b426cc342fd..0fe6d1c959b 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java @@ -71,6 +71,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourSubscriptionTriggeringProvider.cancelAll(); ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null); + + myDaoConfig.setSearchPreFetchThresholds(new DaoConfig().getSearchPreFetchThresholds()); } @Before @@ -169,6 +171,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te @Test public void testTriggerUsingMultipleSearches() throws Exception { + myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(13, 22, 100)); + String payload = "application/fhir+json"; IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); @@ -216,6 +220,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te responseValue = response.getParameter().get(0).getValue().primitiveValue(); assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); +// Thread.sleep(1000000000); + waitForSize(51, ourUpdatedObservations); waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedPatients); From b220154e9cf86f215a95539a8d0ab991b7368908 Mon Sep 17 00:00:00 2001 From: James Agnew Date: Wed, 17 Oct 2018 07:02:11 -0400 Subject: [PATCH 09/10] Fix error in retriggeer logic --- .../ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java index 15eaff67ac0..c352bab7edc 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java @@ -248,7 +248,6 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic theJobDetails.setCurrentSearchUuid(search.getUuid()); theJobDetails.setCurrentSearchResourceType(resourceType); theJobDetails.setCurrentSearchCount(params.getCount()); - theJobDetails.setCurrentSearchCount(null); theJobDetails.setCurrentSearchLastUploadedIndex(-1); } From 4b5dcce021943322d4761dd041438cd7e538e10f Mon Sep 17 00:00:00 2001 From: James Agnew Date: Wed, 17 Oct 2018 08:32:57 -0400 Subject: [PATCH 10/10] Add a bit of logging to subscription triggering --- .../jpa/provider/SubscriptionTriggeringProvider.java | 1 + .../SubscriptionTriggeringDstu3Test.java | 12 ++---------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java index c352bab7edc..a232e1aadf2 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java @@ -173,6 +173,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic synchronized (myActiveJobs) { myActiveJobs.add(jobDetails); } + ourLog.info("Subscription triggering requested for {} resource and {} search - Gave job ID: {}", resourceIds.size(), searchUrls.size(), jobDetails.getJobId()); // Create a parameters response IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java index 0fe6d1c959b..23383af682b 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java @@ -62,8 +62,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te myDaoConfig.setAllowMultipleDelete(true); ourLog.info("Deleting all subscriptions"); - ourClient.delete().resourceConditionalByUrl("Subscription?status=active").execute(); - ourClient.delete().resourceConditionalByUrl("Observation?code:missing=false").execute(); + ourClient.delete().resourceConditionalByUrl("Subscription?_lastUpdated=lt3000").execute(); + ourClient.delete().resourceConditionalByUrl("Observation?_lastUpdated=lt3000").execute(); ourLog.info("Done deleting all subscriptions"); myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete()); @@ -151,8 +151,6 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te waitForSize(1, ourUpdatedObservations); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); - ourClient.registerInterceptor(new LoggingInterceptor(true)); - Parameters response = ourClient .operation() .onInstance(subscriptionId) @@ -199,8 +197,6 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); - ourClient.registerInterceptor(new LoggingInterceptor(true)); - Parameters response = ourClient .operation() .onInstance(sub1id) @@ -257,8 +253,6 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); - ourClient.registerInterceptor(new LoggingInterceptor(true)); - Parameters response = ourClient .operation() .onInstance(sub1id) @@ -322,8 +316,6 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(50); - ourClient.registerInterceptor(new LoggingInterceptor(true)); - Parameters response = ourClient .operation() .onType(Subscription.class)