One more optimization to the subscription retriggering logic

This commit is contained in:
James Agnew 2018-10-17 05:53:07 -04:00
parent 2c05d9c5db
commit 8130700d68
2 changed files with 24 additions and 13 deletions

View File

@ -195,6 +195,9 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
return;
}
String activeJobIds = myActiveJobs.stream().map(t->t.getJobId()).collect(Collectors.joining(", "));
ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
runJob(activeJob);
@ -219,6 +222,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
StopWatch sw = new StopWatch();
ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
// Submit individual resources
int totalSubmitted = 0;
@ -244,14 +248,13 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
theJobDetails.setCurrentSearchUuid(search.getUuid());
theJobDetails.setCurrentSearchResourceType(resourceType);
theJobDetails.setCurrentSearchCount(params.getCount());
theJobDetails.setCurrentSearchCount(null);
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
}
// If we have an active search going, submit resources from it
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
int fromIndex = 0;
if (theJobDetails.getCurrentSearchLastUploadedIndex() != null) {
fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
}
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
@ -260,26 +263,28 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
}
ourLog.info("Triggering job[{}] submitting up to {} resources for search {}", theJobDetails.getJobId(), maxQuerySize, theJobDetails.getCurrentSearchUuid());
ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
for (Long next : resourceIds) {
IBaseResource nextResource = resourceDao.readByPid(next);
submitResource(theJobDetails.getSubscriptionId(), nextResource);
totalSubmitted++;
theJobDetails.setCurrentSearchLastUploadedIndex(toIndex - 1);
theJobDetails.setCurrentSearchLastUploadedIndex(theJobDetails.getCurrentSearchLastUploadedIndex()+1);
}
int expectedCount = toIndex - fromIndex;
if (resourceIds.size() < expectedCount || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.setCurrentSearchUuid(null);
theJobDetails.setCurrentSearchLastUploadedIndex(null);
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null);
}
}
ourLog.info("Subscription trigger job[{}] triggered {} resources in {} ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
}
private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
@ -325,7 +330,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
private String myCurrentSearchUuid;
private Integer myCurrentSearchCount;
private String myCurrentSearchResourceType;
private Integer myCurrentSearchLastUploadedIndex;
private int myCurrentSearchLastUploadedIndex;
public Integer getCurrentSearchCount() {
return myCurrentSearchCount;
@ -383,11 +388,11 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
myCurrentSearchUuid = theCurrentSearchUuid;
}
public Integer getCurrentSearchLastUploadedIndex() {
public int getCurrentSearchLastUploadedIndex() {
return myCurrentSearchLastUploadedIndex;
}
public void setCurrentSearchLastUploadedIndex(Integer theCurrentSearchLastUploadedIndex) {
public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
}
}

View File

@ -71,6 +71,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
ourSubscriptionTriggeringProvider.cancelAll();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null);
myDaoConfig.setSearchPreFetchThresholds(new DaoConfig().getSearchPreFetchThresholds());
}
@Before
@ -169,6 +171,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
@Test
public void testTriggerUsingMultipleSearches() throws Exception {
myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(13, 22, 100));
String payload = "application/fhir+json";
IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement();
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
@ -216,6 +220,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
// Thread.sleep(1000000000);
waitForSize(51, ourUpdatedObservations);
waitForSize(0, ourCreatedObservations);
waitForSize(0, ourCreatedPatients);