diff --git a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl.java b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl.java index 68a59ddede0..1f1d0ac1d67 100644 --- a/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl.java +++ b/hapi-fhir-jpaserver-subscription/src/main/java/ca/uhn/fhir/jpa/subscription/triggering/SubscriptionTriggeringSvcImpl.java @@ -52,6 +52,7 @@ import ca.uhn.fhir.util.ParametersUtil; import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.UrlUtil; import ca.uhn.fhir.util.ValidateUtil; +import com.google.common.collect.Lists; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.concurrent.BasicThreadFactory; @@ -218,12 +219,11 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc // Submit individual resources AtomicInteger totalSubmitted = new AtomicInteger(0); - List>> futures = new ArrayList<>(); + List> futures = new ArrayList<>(); while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted.get() < myMaxSubmitPerPass) { totalSubmitted.incrementAndGet(); String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0); - Future future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId); - futures.add(Pair.of(nextResourceId, future)); + submitResource(theJobDetails.getSubscriptionId(), nextResourceId); } // Make sure these all succeeded in submitting @@ -266,55 +266,127 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc // processing step for synchronous processing mode if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted.get() < myMaxSubmitPerPass) { - List allCurrentResources; + processSynchronous(theJobDetails, totalSubmitted, futures, search); + } - int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; + // processing step for asynchronous processing mode + if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < myMaxSubmitPerPass) { + processAsynchronous(theJobDetails, totalSubmitted, futures); + } - String searchUrl = theJobDetails.getCurrentSearchUrl(); + ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted.get(), TimeUnit.SECONDS)); + } - ourLog.info("Triggered job [{}] - Starting synchronous processing at offset {} and index {}", theJobDetails.getJobId(), theJobDetails.getCurrentOffset(), fromIndex); + private void processAsynchronous(SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List> futures) { + int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; - int submittableCount = myMaxSubmitPerPass - totalSubmitted.get(); - int toIndex = fromIndex + submittableCount; + IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); - if (nonNull(search) && !search.isEmpty()) { + int maxQuerySize = myMaxSubmitPerPass - totalSubmitted.get(); + int toIndex; + if (theJobDetails.getCurrentSearchCount() != null) { + toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount()); + } else { + toIndex = fromIndex + maxQuerySize; + } - // we already have data from the initial step so process as much as we can. - ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex); - allCurrentResources = search.getResources(0, toIndex); + ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); - } else { - if (theJobDetails.getCurrentSearchCount() != null) { - toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); + + List> allResourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null); + + ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), allResourceIds.size()); + AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); + + List>> partitions = Lists.partition(allResourceIds, 100); + for (List> resourceIds : partitions) { + Runnable job = () -> { + + String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType()); + RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType()); + ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder(resourceDao, resourceType, resourceDef.getImplementingClass()); + List listToPopulate = new ArrayList<>(); + + myTransactionService + .withRequest(null) + .execute(() -> { + searchBuilder.loadResourcesByPid(resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails()); + }); + + for (IBaseResource nextResource : listToPopulate) { + submitResource(theJobDetails.getSubscriptionId(), nextResource); + totalSubmitted.incrementAndGet(); + highestIndexSubmitted.incrementAndGet(); } - RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl); - String queryPart = searchUrl.substring(searchUrl.indexOf('?')); - SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); - int offset = theJobDetails.getCurrentOffset() + fromIndex; - params.setOffset(offset); - params.setCount(toIndex); + }; - ourLog.info("Triggered job[{}] requesting {} resources from offset {}", theJobDetails.getJobId(), toIndex, offset); + Future future = myExecutorService.submit(job); + futures.add(future); + } - search = mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions()); - allCurrentResources = search.getAllResources(); + if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { + + theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get()); + + if (allResourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { + ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid()); + theJobDetails.setCurrentSearchResourceType(null); + theJobDetails.setCurrentSearchUuid(null); + theJobDetails.setCurrentSearchLastUploadedIndex(-1); + theJobDetails.setCurrentSearchCount(null); + } + } + } + + private void processSynchronous(SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List> futures, IBundleProvider search) { + List allCurrentResources; + + int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; + + String searchUrl = theJobDetails.getCurrentSearchUrl(); + + ourLog.info("Triggered job [{}] - Starting synchronous processing at offset {} and index {}", theJobDetails.getJobId(), theJobDetails.getCurrentOffset(), fromIndex); + + int submittableCount = myMaxSubmitPerPass - totalSubmitted.get(); + int toIndex = fromIndex + submittableCount; + + if (nonNull(search) && !search.isEmpty()) { + + // we already have data from the initial step so process as much as we can. + ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex); + allCurrentResources = search.getResources(0, toIndex); + + } else { + if (theJobDetails.getCurrentSearchCount() != null) { + toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); } - ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size()); - AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); + RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl); + String queryPart = searchUrl.substring(searchUrl.indexOf('?')); + SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef); + int offset = theJobDetails.getCurrentOffset() + fromIndex; + params.setOffset(offset); + params.setCount(toIndex); + + ourLog.info("Triggered job[{}] requesting {} resources from offset {}", theJobDetails.getJobId(), toIndex, offset); + + search = mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions()); + allCurrentResources = search.getAllResources(); + } + + ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size()); + AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); - for (IBaseResource nextResource : allCurrentResources) { - Future future = submitResource(theJobDetails.getSubscriptionId(), nextResource); - futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future)); - totalSubmitted.incrementAndGet(); - highestIndexSubmitted.incrementAndGet(); - } + for (IBaseResource nextResource : allCurrentResources) { + Future future = myExecutorService.submit(()->submitResource(theJobDetails.getSubscriptionId(), nextResource)); + futures.add(future); + totalSubmitted.incrementAndGet(); + highestIndexSubmitted.incrementAndGet(); + } - if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { - return; - } + if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get()); @@ -329,64 +401,6 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc } } - - // processing step for asynchronous processing mode - if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < myMaxSubmitPerPass) { - int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; - - IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); - - int maxQuerySize = myMaxSubmitPerPass - totalSubmitted.get(); - int toIndex; - if (theJobDetails.getCurrentSearchCount() != null) { - toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount()); - } else { - toIndex = fromIndex + maxQuerySize; - } - - ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); - - - List> resourceIds; - resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null); - - ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size()); - AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex()); - - String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType()); - RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType()); - ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder(resourceDao, resourceType, resourceDef.getImplementingClass()); - List listToPopulate = new ArrayList<>(); - - myTransactionService - .withRequest(null) - .execute(() -> { - searchBuilder.loadResourcesByPid(resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails()); - }); - - for (IBaseResource nextResource : listToPopulate) { - Future future = submitResource(theJobDetails.getSubscriptionId(), nextResource); - futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future)); - totalSubmitted.incrementAndGet(); - highestIndexSubmitted.incrementAndGet(); - } - - if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) { - return; - } - - theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get()); - - if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { - ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid()); - theJobDetails.setCurrentSearchResourceType(null); - theJobDetails.setCurrentSearchUuid(null); - theJobDetails.setCurrentSearchLastUploadedIndex(-1); - theJobDetails.setCurrentSearchCount(null); - } - } - - ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted.get(), TimeUnit.SECONDS)); } private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) { @@ -397,57 +411,54 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc return isInitialStep(theJobDetails); } - private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List>> theIdToFutures) { + private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List> theFutures) { - for (Pair> next : theIdToFutures) { - String nextDeliveredId = next.getKey(); + for (Future nextFuture : theFutures) { try { - Future nextFuture = next.getValue(); nextFuture.get(); - ourLog.info("Finished redelivering {}", nextDeliveredId); } catch (Exception e) { - ourLog.error("Failure triggering resource " + nextDeliveredId, e); + ourLog.error("Failure triggering resource", e); return true; } } // Clear the list since it will potentially get reused - theIdToFutures.clear(); + theFutures.clear(); return false; } - private Future submitResource(String theSubscriptionId, String theResourceIdToTrigger) { + private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) { org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger); IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType()); IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions()); - return submitResource(theSubscriptionId, resourceToTrigger); + submitResource(theSubscriptionId, resourceToTrigger); } - private Future submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) { + private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) { ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId); ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE); msg.setSubscriptionId(theSubscriptionId); - return myExecutorService.submit(() -> { - for (int i = 0; ; i++) { - try { - myResourceModifiedConsumer.submitResourceModified(msg); - break; - } catch (Exception e) { - if (i >= 3) { - throw new InternalErrorException(Msg.code(25) + e); - } + for (int i = 0; ; i++) { + try { + myResourceModifiedConsumer.submitResourceModified(msg); + break; + } catch (Exception e) { + if (i >= 3) { + throw new InternalErrorException(Msg.code(25) + e); + } - ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString()); + ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString()); + try { Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); } } - - return null; - }); + } } @@ -499,7 +510,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc }; myExecutorService = new ThreadPoolExecutor( 0, - 10, + 20, 0L, TimeUnit.MILLISECONDS, executorQueue,