From 755060f91e2cfc5eb1801932fd6a0d1b9064a72f Mon Sep 17 00:00:00 2001 From: James Agnew Date: Tue, 16 Oct 2018 15:29:41 -0400 Subject: [PATCH] Allow subscription triggering on a block of resources --- .../java/ca/uhn/fhir/context/FhirContext.java | 6 +- .../java/ca/uhn/fhir/util/DatatypeUtil.java | 9 +- .../fhir/jpa/dao/BaseHapiFhirResourceDao.java | 19 + .../ca/uhn/fhir/jpa/dao/IFhirResourceDao.java | 10 +- .../SubscriptionTriggeringProvider.java | 353 ++++++++++++++++-- .../search/PersistedJpaBundleProvider.java | 2 +- .../jpa/search/warm/CacheWarmingSvcImpl.java | 19 +- .../java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java | 4 +- .../dstu3/BaseResourceProviderDstu3Test.java | 2 + .../SubscriptionTriggeringDstu3Test.java | 224 ++++++++++- 10 files changed, 599 insertions(+), 49 deletions(-) diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java index 86514b5ae53..467c73d9cc5 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java @@ -442,8 +442,6 @@ public class FhirContext { * * @throws DataFormatException If the resource name is not known */ - // Multiple spots in HAPI FHIR and Smile CDR depend on DataFormatException being - // thrown by this method, don't change that. public RuntimeResourceDefinition getResourceDefinition(String theResourceName) throws DataFormatException { validateInitialized(); Validate.notBlank(theResourceName, "theResourceName must not be blank"); @@ -454,6 +452,10 @@ public class FhirContext { if (retVal == null) { Class clazz = myNameToResourceType.get(resourceName.toLowerCase()); if (clazz == null) { + // *********************************************************************** + // Multiple spots in HAPI FHIR and Smile CDR depend on DataFormatException + // being thrown by this method, don't change that. + // *********************************************************************** throw new DataFormatException(createUnknownResourceNameError(theResourceName, myVersion.getVersion())); } if (IBaseResource.class.isAssignableFrom(clazz)) { diff --git a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java index 4c03ac87ded..3624e14956b 100644 --- a/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java +++ b/hapi-fhir-base/src/main/java/ca/uhn/fhir/util/DatatypeUtil.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.util; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ package ca.uhn.fhir.util; import org.hl7.fhir.instance.model.api.IPrimitiveType; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -35,7 +36,7 @@ public class DatatypeUtil { HashSet retVal = new HashSet<>(); if (theStringList != null) { for (IPrimitiveType string : theStringList) { - if (string != null && string.getValue()!=null) { + if (string != null && string.getValue() != null) { retVal.add(string.getValueAsString()); } } @@ -44,7 +45,7 @@ public class DatatypeUtil { } /** - * Joins a list of strings with a single space (' ') between each string + * Joins a list of strings with a single space (' ') between each string */ public static String joinStringsSpaceSeparated(List> theStrings) { StringBuilder b = new StringBuilder(); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index 73dcf594fe5..d5cc0ce599c 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -832,6 +832,25 @@ public abstract class BaseHapiFhirResourceDao extends B return processMatchUrl(theMatchUrl, getResourceType()); } + @Override + public IBaseResource readByPid(Long thePid) { + StopWatch w = new StopWatch(); + + Optional entity = myResourceTableDao.findById(thePid); + if (!entity.isPresent()) { + throw new ResourceNotFoundException("No resource found with PID " + thePid); + } + if (entity.get().getDeleted() != null) { + throw new ResourceGoneException("Resource was deleted at " + new InstantType(entity.get().getDeleted()).getValueAsString()); + } + + T retVal = toResource(myResourceType, entity.get(), null, false); + + ourLog.debug("Processed read on {} in {}ms", thePid, w.getMillis()); + return retVal; + } + + @Override public T read(IIdType theId) { return read(theId, null); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java index 51d7ac83894..99604fbf412 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/IFhirResourceDao.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.dao; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -158,6 +158,11 @@ public interface IFhirResourceDao extends IDao { */ T read(IIdType theId); + /** + * Read a resource by its internal PID + */ + IBaseResource readByPid(Long thePid); + /** * @param theId * @param theRequestDetails TODO @@ -239,6 +244,7 @@ public interface IFhirResourceDao extends IDao { RuntimeResourceDefinition validateCriteriaAndReturnResourceDefinition(String criteria); + // /** // * Invoke the everything operation // */ diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java index 8c29aa2d903..85029c80baa 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java @@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.provider; * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,64 +21,163 @@ package ca.uhn.fhir.jpa.provider; */ import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.context.RuntimeResourceDefinition; +import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; +import ca.uhn.fhir.jpa.dao.DaoRegistry; import ca.uhn.fhir.jpa.dao.IFhirResourceDao; -import ca.uhn.fhir.jpa.dao.IFhirSystemDao; +import ca.uhn.fhir.jpa.dao.SearchParameterMap; +import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc; +import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl; import ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor; import ca.uhn.fhir.jpa.subscription.ResourceModifiedMessage; import ca.uhn.fhir.jpa.util.JpaConstants; import ca.uhn.fhir.rest.annotation.IdParam; import ca.uhn.fhir.rest.annotation.Operation; import ca.uhn.fhir.rest.annotation.OperationParam; +import ca.uhn.fhir.rest.api.CacheControlDirective; +import ca.uhn.fhir.rest.api.server.IBundleProvider; +import ca.uhn.fhir.rest.param.StringParam; import ca.uhn.fhir.rest.param.UriParam; import ca.uhn.fhir.rest.server.IResourceProvider; +import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; +import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException; import ca.uhn.fhir.util.ParametersUtil; +import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.ValidateUtil; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.Validate; +import org.apache.commons.lang3.time.DateUtils; import org.hl7.fhir.instance.model.IdType; import org.hl7.fhir.instance.model.api.IBaseParameters; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IPrimitiveType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.scheduling.annotation.Scheduled; -import java.util.List; +import javax.annotation.PostConstruct; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; -public class SubscriptionTriggeringProvider implements IResourceProvider { +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +public class SubscriptionTriggeringProvider implements IResourceProvider, ApplicationContextAware { public static final String RESOURCE_ID = "resourceId"; + public static final int DEFAULT_MAX_SUBMIT = 10000; + public static final String SEARCH_URL = "searchUrl"; + private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionTriggeringProvider.class); + private final List myActiveJobs = new ArrayList<>(); @Autowired private FhirContext myFhirContext; @Autowired - private IFhirSystemDao mySystemDao; - @Autowired(required = false) + private DaoRegistry myDaoRegistry; private List> mySubscriptionInterceptorList; + private int myMaxSubmitPerPass = DEFAULT_MAX_SUBMIT; + @Autowired + private ISearchCoordinatorSvc mySearchCoordinatorSvc; + private ApplicationContext myAppCtx; - @Operation(name= JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + /** + * Sets the maximum number of resources that will be submitted in a single pass + */ + public void setMaxSubmitPerPass(Integer theMaxSubmitPerPass) { + Integer maxSubmitPerPass = theMaxSubmitPerPass; + if (maxSubmitPerPass == null) { + maxSubmitPerPass = DEFAULT_MAX_SUBMIT; + } + Validate.isTrue(maxSubmitPerPass > 0, "theMaxSubmitPerPass must be > 0"); + myMaxSubmitPerPass = maxSubmitPerPass; + } + + @SuppressWarnings("unchecked") + @PostConstruct + public void start() { + mySubscriptionInterceptorList = ObjectUtils.defaultIfNull(mySubscriptionInterceptorList, Collections.emptyList()); + mySubscriptionInterceptorList = new ArrayList<>(); + Collection values1 = myAppCtx.getBeansOfType(BaseSubscriptionInterceptor.class).values(); + Collection> values = (Collection>) values1; + mySubscriptionInterceptorList.addAll(values); + } + + @Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + public IBaseParameters triggerSubscription( + @OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List theResourceIds, + @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List theSearchUrls + ) { + return doTriggerSubscription(theResourceIds, theSearchUrls, null); + } + + @Operation(name = JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) public IBaseParameters triggerSubscription( @IdParam IIdType theSubscriptionId, - @OperationParam(name= RESOURCE_ID) UriParam theResourceId) { + @OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List theResourceIds, + @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List theSearchUrls + ) { - ValidateUtil.isTrueOrThrowInvalidRequest(theResourceId != null, RESOURCE_ID + " parameter not provided"); - IdType resourceId = new IdType(theResourceId.getValue()); - ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type"); - ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part"); + // Throw a 404 if the subscription doesn't exist + IFhirResourceDao subscriptionDao = myDaoRegistry.getResourceDao("Subscription"); + IIdType subscriptionId = theSubscriptionId; + if (subscriptionId.hasResourceType() == false) { + subscriptionId = subscriptionId.withResourceType("Subscription"); + } + subscriptionDao.read(subscriptionId); - Class resourceType = myFhirContext.getResourceDefinition(resourceId.getResourceType()).getImplementingClass(); - IFhirResourceDao dao = mySystemDao.getDao(resourceType); - IBaseResource resourceToTrigger = dao.read(resourceId); + return doTriggerSubscription(theResourceIds, theSearchUrls, subscriptionId); - ResourceModifiedMessage msg = new ResourceModifiedMessage(); - msg.setId(resourceToTrigger.getIdElement()); - msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE); - msg.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue()); - msg.setNewPayload(myFhirContext, resourceToTrigger); + } - for (BaseSubscriptionInterceptor next :mySubscriptionInterceptorList) { - next.submitResourceModified(msg); + private IBaseParameters doTriggerSubscription(@OperationParam(name = RESOURCE_ID, min = 0, max = OperationParam.MAX_UNLIMITED) List theResourceIds, @OperationParam(name = SEARCH_URL, min = 0, max = OperationParam.MAX_UNLIMITED) List theSearchUrls, @IdParam IIdType theSubscriptionId) { + if (mySubscriptionInterceptorList.isEmpty()) { + throw new PreconditionFailedException("Subscription processing not active on this server"); } + List resourceIds = ObjectUtils.defaultIfNull(theResourceIds, Collections.emptyList()); + List searchUrls = ObjectUtils.defaultIfNull(theSearchUrls, Collections.emptyList()); + + // Make sure we have at least one resource ID or search URL + if (resourceIds.size() == 0 && searchUrls.size() == 0) { + throw new InvalidRequestException("No resource IDs or search URLs specified for triggering"); + } + + // Resource URLs must be compete + for (UriParam next : resourceIds) { + IdType resourceId = new IdType(next.getValue()); + ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasResourceType(), RESOURCE_ID + " parameter must have resource type"); + ValidateUtil.isTrueOrThrowInvalidRequest(resourceId.hasIdPart(), RESOURCE_ID + " parameter must have resource ID part"); + } + + // Search URLs must be valid + for (StringParam next : searchUrls) { + if (next.getValue().contains("?") == false) { + throw new InvalidRequestException("Search URL is not valid (must be in the form \"[resource type]?[optional params]\")"); + } + } + + SubscriptionTriggeringJobDetails jobDetails = new SubscriptionTriggeringJobDetails(); + jobDetails.setJobId(UUID.randomUUID().toString()); + jobDetails.setRemainingResourceIds(resourceIds.stream().map(UriParam::getValue).collect(Collectors.toList())); + jobDetails.setRemainingSearchUrls(searchUrls.stream().map(StringParam::getValue).collect(Collectors.toList())); + if (theSubscriptionId != null) { + jobDetails.setSubscriptionId(theSubscriptionId.toUnqualifiedVersionless().getValue()); + } + + // Submit job for processing + synchronized (myActiveJobs) { + myActiveJobs.add(jobDetails); + } + + // Create a parameters response IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext); IPrimitiveType value = (IPrimitiveType) myFhirContext.getElementDefinition("string").newInstance(); - value.setValueAsString("Triggered resource " + theResourceId.getValue() + " for subscription"); + value.setValueAsString("Subscription triggering job submitted as JOB ID: " + jobDetails.myJobId); ParametersUtil.addParameterToParameters(myFhirContext, retVal, "information", value); return retVal; } @@ -87,4 +186,210 @@ public class SubscriptionTriggeringProvider implements IResourceProvider { public Class getResourceType() { return myFhirContext.getResourceDefinition("Subscription").getImplementingClass(); } + + @Scheduled(fixedDelay = DateUtils.MILLIS_PER_SECOND) + public void runDeliveryPass() { + + synchronized (myActiveJobs) { + if (myActiveJobs.isEmpty()) { + return; + } + + SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0); + + runJob(activeJob); + + // If the job is complete, remove it from the queue + if (activeJob.getRemainingResourceIds().isEmpty()) { + if (activeJob.getRemainingSearchUrls().isEmpty()) { + if (isBlank(activeJob.myCurrentSearchUuid)) { + myActiveJobs.remove(0); + String remainingJobsMsg = ""; + if (myActiveJobs.size() > 0) { + remainingJobsMsg = "(" + myActiveJobs.size() + " jobs remaining)"; + } + ourLog.info("Subscription triggering job {} is complete{}", activeJob.getJobId(), remainingJobsMsg); + } + } + } + + } + + } + + private void runJob(SubscriptionTriggeringJobDetails theJobDetails) { + StopWatch sw = new StopWatch(); + + // Submit individual resources + int totalSubmitted = 0; + while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted < myMaxSubmitPerPass) { + totalSubmitted++; + String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0); + submitResource(theJobDetails.getSubscriptionId(), nextResourceId); + } + + // If we don't have an active search started, and one needs to be.. start it + if (isBlank(theJobDetails.getCurrentSearchUuid()) && theJobDetails.getRemainingSearchUrls().size() > 0 && totalSubmitted < myMaxSubmitPerPass) { + String nextSearchUrl = theJobDetails.getRemainingSearchUrls().remove(0); + RuntimeResourceDefinition resourceDef = CacheWarmingSvcImpl.parseUrlResourceType(myFhirContext, nextSearchUrl); + String queryPart = nextSearchUrl.substring(nextSearchUrl.indexOf('?')); + String resourceType = resourceDef.getName(); + + IFhirResourceDao callingDao = myDaoRegistry.getResourceDao(resourceType); + SearchParameterMap params = BaseHapiFhirDao.translateMatchUrl(callingDao, myFhirContext, queryPart, resourceDef); + + ourLog.info("Triggering job[{}] is starting a search for {}", theJobDetails.getJobId(), nextSearchUrl); + + IBundleProvider search = mySearchCoordinatorSvc.registerSearch(callingDao, params, resourceType, new CacheControlDirective()); + theJobDetails.setCurrentSearchUuid(search.getUuid()); + theJobDetails.setCurrentSearchResourceType(resourceType); + theJobDetails.setCurrentSearchCount(params.getCount()); + } + + // If we have an active search going, submit resources from it + if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) { + int fromIndex = 0; + if (theJobDetails.getCurrentSearchLastUploadedIndex() != null) { + fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; + } + + IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); + + int maxQuerySize = myMaxSubmitPerPass - totalSubmitted; + int toIndex = fromIndex + maxQuerySize; + if (theJobDetails.getCurrentSearchCount() != null) { + toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); + } + ourLog.info("Triggering job[{}] submitting up to {} resources for search {}", theJobDetails.getJobId(), maxQuerySize, theJobDetails.getCurrentSearchUuid()); + List resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); + for (Long next : resourceIds) { + IBaseResource nextResource = resourceDao.readByPid(next); + submitResource(theJobDetails.getSubscriptionId(), nextResource); + totalSubmitted++; + theJobDetails.setCurrentSearchLastUploadedIndex(toIndex - 1); + } + + int expectedCount = toIndex - fromIndex; + if (resourceIds.size() < expectedCount || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { + ourLog.info("Triggering job[{}] search {} has completed", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid()); + theJobDetails.setCurrentSearchResourceType(null); + theJobDetails.setCurrentSearchUuid(null); + theJobDetails.setCurrentSearchLastUploadedIndex(null); + theJobDetails.setCurrentSearchCount(null); + } + } + + ourLog.info("Subscription trigger job[{}] triggered {} resources in {} ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); + } + + private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) { + org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger); + IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType()); + IBaseResource resourceToTrigger = dao.read(resourceId); + + submitResource(theSubscriptionId, resourceToTrigger); + } + + private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) { + + ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId); + + ResourceModifiedMessage msg = new ResourceModifiedMessage(); + msg.setId(theResourceToTrigger.getIdElement()); + msg.setOperationType(ResourceModifiedMessage.OperationTypeEnum.UPDATE); + msg.setSubscriptionId(new IdType(theSubscriptionId).toUnqualifiedVersionless().getValue()); + msg.setNewPayload(myFhirContext, theResourceToTrigger); + + for (BaseSubscriptionInterceptor next : mySubscriptionInterceptorList) { + next.submitResourceModified(msg); + } + } + + public void cancelAll() { + synchronized (myActiveJobs) { + myActiveJobs.clear(); + } + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + myAppCtx = applicationContext; + } + + private static class SubscriptionTriggeringJobDetails { + + private String myJobId; + private String mySubscriptionId; + private List myRemainingResourceIds; + private List myRemainingSearchUrls; + private String myCurrentSearchUuid; + private Integer myCurrentSearchCount; + private String myCurrentSearchResourceType; + private Integer myCurrentSearchLastUploadedIndex; + + public Integer getCurrentSearchCount() { + return myCurrentSearchCount; + } + + public void setCurrentSearchCount(Integer theCurrentSearchCount) { + myCurrentSearchCount = theCurrentSearchCount; + } + + public String getCurrentSearchResourceType() { + return myCurrentSearchResourceType; + } + + public void setCurrentSearchResourceType(String theCurrentSearchResourceType) { + myCurrentSearchResourceType = theCurrentSearchResourceType; + } + + public String getJobId() { + return myJobId; + } + + public void setJobId(String theJobId) { + myJobId = theJobId; + } + + public String getSubscriptionId() { + return mySubscriptionId; + } + + public void setSubscriptionId(String theSubscriptionId) { + mySubscriptionId = theSubscriptionId; + } + + public List getRemainingResourceIds() { + return myRemainingResourceIds; + } + + public void setRemainingResourceIds(List theRemainingResourceIds) { + myRemainingResourceIds = theRemainingResourceIds; + } + + public List getRemainingSearchUrls() { + return myRemainingSearchUrls; + } + + public void setRemainingSearchUrls(List theRemainingSearchUrls) { + myRemainingSearchUrls = theRemainingSearchUrls; + } + + public String getCurrentSearchUuid() { + return myCurrentSearchUuid; + } + + public void setCurrentSearchUuid(String theCurrentSearchUuid) { + myCurrentSearchUuid = theCurrentSearchUuid; + } + + public Integer getCurrentSearchLastUploadedIndex() { + return myCurrentSearchLastUploadedIndex; + } + + public void setCurrentSearchLastUploadedIndex(Integer theCurrentSearchLastUploadedIndex) { + myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex; + } + } + } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java index 330c7672791..9a34dde5ecb 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/PersistedJpaBundleProvider.java @@ -117,7 +117,7 @@ public class PersistedJpaBundleProvider implements IBundleProvider { results = query.getResultList(); - ArrayList retVal = new ArrayList(); + ArrayList retVal = new ArrayList<>(); for (ResourceHistoryTable next : results) { BaseHasResource resource; resource = next; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java index 667c87a3016..bfca9c536e4 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/search/warm/CacheWarmingSvcImpl.java @@ -24,6 +24,7 @@ import ca.uhn.fhir.context.ConfigurationException; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.RuntimeResourceDefinition; import ca.uhn.fhir.jpa.dao.*; +import ca.uhn.fhir.parser.DataFormatException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -68,7 +69,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { private void refreshNow(WarmCacheEntry theCacheEntry) { String nextUrl = theCacheEntry.getUrl(); - RuntimeResourceDefinition resourceDef = parseWarmUrlResourceType(nextUrl); + RuntimeResourceDefinition resourceDef = parseUrlResourceType(myCtx, nextUrl); IFhirResourceDao callingDao = myDaoRegistry.getResourceDao(resourceDef.getName()); String queryPart = parseWarmUrlParamPart(nextUrl); SearchParameterMap responseCriteriaUrl = BaseHapiFhirDao.translateMatchUrl(callingDao, myCtx, queryPart, resourceDef); @@ -84,14 +85,18 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { return theNextUrl.substring(paramIndex); } - private RuntimeResourceDefinition parseWarmUrlResourceType(String theNextUrl) { - int paramIndex = theNextUrl.indexOf('?'); - String resourceName = theNextUrl.substring(0, paramIndex); + /** + * TODO: this method probably belongs in a utility class, not here + * + * @throws DataFormatException If the resource type is not known + */ + public static RuntimeResourceDefinition parseUrlResourceType(FhirContext theCtx, String theUrl) throws DataFormatException { + int paramIndex = theUrl.indexOf('?'); + String resourceName = theUrl.substring(0, paramIndex); if (resourceName.contains("/")) { resourceName = resourceName.substring(resourceName.lastIndexOf('/') + 1); } - RuntimeResourceDefinition resourceDef = myCtx.getResourceDefinition(resourceName); - return resourceDef; + return theCtx.getResourceDefinition(resourceName); } @PostConstruct @@ -107,7 +112,7 @@ public class CacheWarmingSvcImpl implements ICacheWarmingSvc { // Validate parseWarmUrlParamPart(next.getUrl()); - parseWarmUrlResourceType(next.getUrl()); + parseUrlResourceType(myCtx, next.getUrl()); myCacheEntryToNextRefresh.put(next, 0L); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java index 65da5f5820d..6ec7358cd29 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/BaseJpaTest.java @@ -401,14 +401,14 @@ public abstract class BaseJpaTest { public static void waitForSize(int theTarget, List theList) { StopWatch sw = new StopWatch(); - while (theList.size() != theTarget && sw.getMillis() <= 15000) { + while (theList.size() != theTarget && sw.getMillis() <= 16000) { try { Thread.sleep(50); } catch (InterruptedException theE) { throw new Error(theE); } } - if (sw.getMillis() >= 15000) { + if (sw.getMillis() >= 16000) { String describeResults = theList .stream() .map(t -> { diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java index fdec1636947..32c1648a882 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/provider/dstu3/BaseResourceProviderDstu3Test.java @@ -66,6 +66,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test { protected static ISearchCoordinatorSvc mySearchCoordinatorSvc; private static Server ourServer; private TerminologyUploaderProviderDstu3 myTerminologyUploaderProvider; + protected static SubscriptionTriggeringProvider ourSubscriptionTriggeringProvider; public BaseResourceProviderDstu3Test() { super(); @@ -160,6 +161,7 @@ public abstract class BaseResourceProviderDstu3Test extends BaseJpaDstu3Test { ourRestHookSubscriptionInterceptor = wac.getBean(SubscriptionRestHookInterceptor.class); ourEmailSubscriptionInterceptor = wac.getBean(SubscriptionEmailInterceptor.class); ourSearchParamRegistry = wac.getBean(SearchParamRegistryDstu3.class); + ourSubscriptionTriggeringProvider = wac.getBean(SubscriptionTriggeringProvider.class); myFhirCtx.getRestfulClientFactory().setSocketTimeout(5000000); ourClient = myFhirCtx.newRestfulGenericClient(ourServerBase); diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java index 561cd231988..252e9dbee0b 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java @@ -13,6 +13,7 @@ import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor; import ca.uhn.fhir.rest.server.IResourceProvider; import ca.uhn.fhir.rest.server.RestfulServer; +import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.util.PortUtil; import com.google.common.collect.Lists; import org.eclipse.jetty.server.Server; @@ -27,7 +28,10 @@ import javax.servlet.http.HttpServletRequest; import java.util.ArrayList; import java.util.List; +import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** * Test the rest-hook subscriptions @@ -36,12 +40,14 @@ import static org.junit.Assert.assertEquals; public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Test { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(SubscriptionTriggeringDstu3Test.class); - private static List ourCreatedObservations = Lists.newArrayList(); private static int ourListenerPort; private static RestfulServer ourListenerRestServer; private static Server ourListenerServer; private static String ourListenerServerBase; + private static List ourCreatedObservations = Lists.newArrayList(); private static List ourUpdatedObservations = Lists.newArrayList(); + private static List ourCreatedPatients = Lists.newArrayList(); + private static List ourUpdatedPatients = Lists.newArrayList(); private static List ourContentTypes = new ArrayList<>(); private List mySubscriptionIds = new ArrayList<>(); @@ -63,6 +69,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourRestServer.unregisterInterceptor(ourRestHookSubscriptionInterceptor); + ourSubscriptionTriggeringProvider.cancelAll(); + ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null); } @Before @@ -70,10 +78,15 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourRestServer.registerInterceptor(ourRestHookSubscriptionInterceptor); } + /** + * Only do counter resets here! We call this inside tests + */ @Before public void beforeReset() { ourCreatedObservations.clear(); ourUpdatedObservations.clear(); + ourCreatedPatients.clear(); + ourUpdatedPatients.clear(); ourContentTypes.clear(); } @@ -90,7 +103,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te subscription.setChannel(channel); MethodOutcome methodOutcome = ourClient.create().resource(subscription).execute(); - subscription.setId(methodOutcome.getId().getIdPart()); + subscription.setId(methodOutcome.getId()); mySubscriptionIds.add(methodOutcome.getId()); waitForQueueToDrain(); @@ -116,6 +129,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te return observation; } + @Test public void testTriggerResourceToSpecificSubscription() throws Exception { String payload = "application/fhir+json"; @@ -145,7 +159,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te .execute(); String responseValue = response.getParameter().get(0).getValue().primitiveValue(); - assertEquals("Triggered resource " + obsId.getValue() + " for subscription", responseValue); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); waitForQueueToDrain(); waitForSize(0, ourCreatedObservations); @@ -153,6 +167,173 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te } + @Test + public void testTriggerUsingMultipleSearches() throws Exception { + String payload = "application/fhir+json"; + IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); + IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); + + // Create lots + for (int i = 0; i < 50; i++) { + Patient p = new Patient(); + p.addName().setFamily("P" + i); + ourClient.create().resource(p).execute(); + } + for (int i = 0; i < 50; i++) { + Observation o = new Observation(); + o.setId("O" + i); + o.setStatus(Observation.ObservationStatus.FINAL); + o.getCode().setText("O" + i); + ourClient.update().resource(o).execute(); + } + + waitForSize(50, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(50, ourUpdatedPatients); + beforeReset(); + + ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); + + ourClient.registerInterceptor(new LoggingInterceptor(true)); + + Parameters response = ourClient + .operation() + .onInstance(sub1id) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?")) + .andParameter(SubscriptionTriggeringProvider.RESOURCE_ID, new UriType("Observation/O2")) + .execute(); + String responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + response = ourClient + .operation() + .onInstance(sub2id) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Patient?")) + .execute(); + responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + waitForSize(51, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(50, ourUpdatedPatients); + + } + + @Test + public void testTriggerUsingSearchesWithCount() throws Exception { + String payload = "application/fhir+json"; + IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); + IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); + + // Create lots + for (int i = 0; i < 50; i++) { + Patient p = new Patient(); + p.addName().setFamily("P" + i); + ourClient.create().resource(p).execute(); + } + for (int i = 0; i < 50; i++) { + Observation o = new Observation(); + o.setId("O" + i); + o.setStatus(Observation.ObservationStatus.FINAL); + o.getCode().setText("O" + i); + ourClient.update().resource(o).execute(); + } + + waitForSize(50, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(50, ourUpdatedPatients); + beforeReset(); + + ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); + + ourClient.registerInterceptor(new LoggingInterceptor(true)); + + Parameters response = ourClient + .operation() + .onInstance(sub1id) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?_count=10")) + .execute(); + String responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + response = ourClient + .operation() + .onInstance(sub2id) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Patient?_count=16")) + .execute(); + responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + waitForSize(10, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(16, ourUpdatedPatients); + + } + + @Test + public void testTriggerUsingInvalidSearchUrl() { + + try { + ourClient + .operation() + .onType(Subscription.class) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation")) + .execute(); + fail(); + } catch (InvalidRequestException e) { + assertEquals("HTTP 400 Bad Request: Search URL is not valid (must be in the form \"[resource type]?[optional params]\")", e.getMessage()); + } + } + + @Test + public void testTriggerAllSubscriptions() throws Exception { + String payload = "application/fhir+json"; + IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); + IdType sub2id = createSubscription("Observation?status=final", payload, ourListenerServerBase).getIdElement(); + + for (int i = 0; i < 50; i++) { + Observation o = new Observation(); + o.setId("O" + i); + o.setStatus(Observation.ObservationStatus.FINAL); + o.getCode().setText("O" + i); + ourClient.update().resource(o).execute(); + } + + waitForSize(100, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(0, ourUpdatedPatients); + beforeReset(); + + ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(33); + + ourClient.registerInterceptor(new LoggingInterceptor(true)); + + Parameters response = ourClient + .operation() + .onType(Subscription.class) + .named(JpaConstants.OPERATION_TRIGGER_SUBSCRIPTION) + .withParameter(Parameters.class, SubscriptionTriggeringProvider.SEARCH_URL, new StringType("Observation?")) + .execute(); + String responseValue = response.getParameter().get(0).getValue().primitiveValue(); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); + + waitForSize(100, ourUpdatedObservations); + waitForSize(0, ourCreatedObservations); + waitForSize(0, ourCreatedPatients); + waitForSize(0, ourUpdatedPatients); + + } + @Test public void testTriggerResourceToSpecificSubscriptionWhichDoesntMatch() throws Exception { String payload = "application/fhir+json"; @@ -172,8 +353,6 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te waitForSize(1, ourUpdatedObservations); assertEquals(Constants.CT_FHIR_JSON_NEW, ourContentTypes.get(0)); - ourClient.registerInterceptor(new LoggingInterceptor(true)); - Parameters response = ourClient .operation() .onInstance(subscriptionId) @@ -182,7 +361,7 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te .execute(); String responseValue = response.getParameter().get(0).getValue().primitiveValue(); - assertEquals("Triggered resource " + obsId.getValue() + " for subscription", responseValue); + assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); waitForQueueToDrain(); waitForSize(0, ourCreatedObservations); @@ -191,6 +370,11 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te } + @Override + protected boolean shouldLogClient() { + return false; + } + private void waitForQueueToDrain() throws InterruptedException { RestHookTestDstu2Test.waitForQueueToDrain(ourRestHookSubscriptionInterceptor); } @@ -220,6 +404,31 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te } + public static class PatientListener implements IResourceProvider { + + @Create + public MethodOutcome create(@ResourceParam Patient thePatient, HttpServletRequest theRequest) { + ourLog.info("Received Listener Create"); + ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); + ourCreatedPatients.add(thePatient); + return new MethodOutcome(new IdType("Patient/1"), true); + } + + @Override + public Class getResourceType() { + return Patient.class; + } + + @Update + public MethodOutcome update(@ResourceParam Patient thePatient, HttpServletRequest theRequest) { + ourUpdatedPatients.add(thePatient); + ourContentTypes.add(theRequest.getHeader(Constants.HEADER_CONTENT_TYPE).replaceAll(";.*", "")); + ourLog.info("Received Listener Update (now have {} updates)", ourUpdatedPatients.size()); + return new MethodOutcome(new IdType("Patient/1"), false); + } + + } + @BeforeClass public static void startListenerServer() throws Exception { ourListenerPort = PortUtil.findFreePort(); @@ -227,7 +436,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourListenerServerBase = "http://localhost:" + ourListenerPort + "/fhir/context"; ObservationListener obsListener = new ObservationListener(); - ourListenerRestServer.setResourceProviders(obsListener); + PatientListener ptListener = new PatientListener(); + ourListenerRestServer.setResourceProviders(obsListener, ptListener); ourListenerServer = new Server(ourListenerPort);