diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java index 5b1aa53129f..15eaff67ac0 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/SubscriptionTriggeringProvider.java @@ -195,6 +195,9 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic return; } + String activeJobIds = myActiveJobs.stream().map(t->t.getJobId()).collect(Collectors.joining(", ")); + ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds); + SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0); runJob(activeJob); @@ -219,6 +222,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic private void runJob(SubscriptionTriggeringJobDetails theJobDetails) { StopWatch sw = new StopWatch(); + ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId()); // Submit individual resources int totalSubmitted = 0; @@ -244,14 +248,13 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic theJobDetails.setCurrentSearchUuid(search.getUuid()); theJobDetails.setCurrentSearchResourceType(resourceType); theJobDetails.setCurrentSearchCount(params.getCount()); + theJobDetails.setCurrentSearchCount(null); + theJobDetails.setCurrentSearchLastUploadedIndex(-1); } // If we have an active search going, submit resources from it if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) { - int fromIndex = 0; - if (theJobDetails.getCurrentSearchLastUploadedIndex() != null) { - fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; - } + int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1; IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); @@ -260,26 +263,28 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic if (theJobDetails.getCurrentSearchCount() != null) { toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); } - ourLog.info("Triggering job[{}] submitting up to {} resources for search {}", theJobDetails.getJobId(), maxQuerySize, theJobDetails.getCurrentSearchUuid()); + ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); List resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); + + ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); for (Long next : resourceIds) { IBaseResource nextResource = resourceDao.readByPid(next); submitResource(theJobDetails.getSubscriptionId(), nextResource); totalSubmitted++; - theJobDetails.setCurrentSearchLastUploadedIndex(toIndex - 1); + theJobDetails.setCurrentSearchLastUploadedIndex(theJobDetails.getCurrentSearchLastUploadedIndex()+1); } int expectedCount = toIndex - fromIndex; - if (resourceIds.size() < expectedCount || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { - ourLog.info("Triggering job[{}] search {} has completed", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid()); + if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { + ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid()); theJobDetails.setCurrentSearchResourceType(null); theJobDetails.setCurrentSearchUuid(null); - theJobDetails.setCurrentSearchLastUploadedIndex(null); + theJobDetails.setCurrentSearchLastUploadedIndex(-1); theJobDetails.setCurrentSearchCount(null); } } - ourLog.info("Subscription trigger job[{}] triggered {} resources in {} ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); + ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); } private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) { @@ -325,7 +330,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic private String myCurrentSearchUuid; private Integer myCurrentSearchCount; private String myCurrentSearchResourceType; - private Integer myCurrentSearchLastUploadedIndex; + private int myCurrentSearchLastUploadedIndex; public Integer getCurrentSearchCount() { return myCurrentSearchCount; @@ -383,11 +388,11 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic myCurrentSearchUuid = theCurrentSearchUuid; } - public Integer getCurrentSearchLastUploadedIndex() { + public int getCurrentSearchLastUploadedIndex() { return myCurrentSearchLastUploadedIndex; } - public void setCurrentSearchLastUploadedIndex(Integer theCurrentSearchLastUploadedIndex) { + public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) { myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex; } } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java index b426cc342fd..0fe6d1c959b 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/subscription/SubscriptionTriggeringDstu3Test.java @@ -71,6 +71,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te ourSubscriptionTriggeringProvider.cancelAll(); ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null); + + myDaoConfig.setSearchPreFetchThresholds(new DaoConfig().getSearchPreFetchThresholds()); } @Before @@ -169,6 +171,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te @Test public void testTriggerUsingMultipleSearches() throws Exception { + myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(13, 22, 100)); + String payload = "application/fhir+json"; IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); @@ -216,6 +220,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te responseValue = response.getParameter().get(0).getValue().primitiveValue(); assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); +// Thread.sleep(1000000000); + waitForSize(51, ourUpdatedObservations); waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedPatients);