Merge branch 'master' of github.com:jamesagnew/hapi-fhir

This commit is contained in:
jamesagnew 2018-10-17 05:53:46 -04:00
commit aa1f624132
2 changed files with 24 additions and 13 deletions

View File

@ -195,6 +195,9 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
return; return;
} }
String activeJobIds = myActiveJobs.stream().map(t->t.getJobId()).collect(Collectors.joining(", "));
ourLog.info("Starting pass: currently have {} active job IDs: {}", myActiveJobs.size(), activeJobIds);
SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0); SubscriptionTriggeringJobDetails activeJob = myActiveJobs.get(0);
runJob(activeJob); runJob(activeJob);
@ -219,6 +222,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
private void runJob(SubscriptionTriggeringJobDetails theJobDetails) { private void runJob(SubscriptionTriggeringJobDetails theJobDetails) {
StopWatch sw = new StopWatch(); StopWatch sw = new StopWatch();
ourLog.info("Starting pass of subscription triggering job {}", theJobDetails.getJobId());
// Submit individual resources // Submit individual resources
int totalSubmitted = 0; int totalSubmitted = 0;
@ -244,14 +248,13 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
theJobDetails.setCurrentSearchUuid(search.getUuid()); theJobDetails.setCurrentSearchUuid(search.getUuid());
theJobDetails.setCurrentSearchResourceType(resourceType); theJobDetails.setCurrentSearchResourceType(resourceType);
theJobDetails.setCurrentSearchCount(params.getCount()); theJobDetails.setCurrentSearchCount(params.getCount());
theJobDetails.setCurrentSearchCount(null);
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
} }
// If we have an active search going, submit resources from it // If we have an active search going, submit resources from it
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) { if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted < myMaxSubmitPerPass) {
int fromIndex = 0; int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
if (theJobDetails.getCurrentSearchLastUploadedIndex() != null) {
fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
}
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType()); IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
@ -260,26 +263,28 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
if (theJobDetails.getCurrentSearchCount() != null) { if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount()); toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
} }
ourLog.info("Triggering job[{}] submitting up to {} resources for search {}", theJobDetails.getJobId(), maxQuerySize, theJobDetails.getCurrentSearchUuid()); ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex); List<Long> resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
for (Long next : resourceIds) { for (Long next : resourceIds) {
IBaseResource nextResource = resourceDao.readByPid(next); IBaseResource nextResource = resourceDao.readByPid(next);
submitResource(theJobDetails.getSubscriptionId(), nextResource); submitResource(theJobDetails.getSubscriptionId(), nextResource);
totalSubmitted++; totalSubmitted++;
theJobDetails.setCurrentSearchLastUploadedIndex(toIndex - 1); theJobDetails.setCurrentSearchLastUploadedIndex(theJobDetails.getCurrentSearchLastUploadedIndex()+1);
} }
int expectedCount = toIndex - fromIndex; int expectedCount = toIndex - fromIndex;
if (resourceIds.size() < expectedCount || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) { if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid()); ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
theJobDetails.setCurrentSearchResourceType(null); theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.setCurrentSearchUuid(null); theJobDetails.setCurrentSearchUuid(null);
theJobDetails.setCurrentSearchLastUploadedIndex(null); theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null); theJobDetails.setCurrentSearchCount(null);
} }
} }
ourLog.info("Subscription trigger job[{}] triggered {} resources in {} ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS)); ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted, TimeUnit.SECONDS));
} }
private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) { private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
@ -325,7 +330,7 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
private String myCurrentSearchUuid; private String myCurrentSearchUuid;
private Integer myCurrentSearchCount; private Integer myCurrentSearchCount;
private String myCurrentSearchResourceType; private String myCurrentSearchResourceType;
private Integer myCurrentSearchLastUploadedIndex; private int myCurrentSearchLastUploadedIndex;
public Integer getCurrentSearchCount() { public Integer getCurrentSearchCount() {
return myCurrentSearchCount; return myCurrentSearchCount;
@ -383,11 +388,11 @@ public class SubscriptionTriggeringProvider implements IResourceProvider, Applic
myCurrentSearchUuid = theCurrentSearchUuid; myCurrentSearchUuid = theCurrentSearchUuid;
} }
public Integer getCurrentSearchLastUploadedIndex() { public int getCurrentSearchLastUploadedIndex() {
return myCurrentSearchLastUploadedIndex; return myCurrentSearchLastUploadedIndex;
} }
public void setCurrentSearchLastUploadedIndex(Integer theCurrentSearchLastUploadedIndex) { public void setCurrentSearchLastUploadedIndex(int theCurrentSearchLastUploadedIndex) {
myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex; myCurrentSearchLastUploadedIndex = theCurrentSearchLastUploadedIndex;
} }
} }

View File

@ -71,6 +71,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
ourSubscriptionTriggeringProvider.cancelAll(); ourSubscriptionTriggeringProvider.cancelAll();
ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null); ourSubscriptionTriggeringProvider.setMaxSubmitPerPass(null);
myDaoConfig.setSearchPreFetchThresholds(new DaoConfig().getSearchPreFetchThresholds());
} }
@Before @Before
@ -169,6 +171,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
@Test @Test
public void testTriggerUsingMultipleSearches() throws Exception { public void testTriggerUsingMultipleSearches() throws Exception {
myDaoConfig.setSearchPreFetchThresholds(Lists.newArrayList(13, 22, 100));
String payload = "application/fhir+json"; String payload = "application/fhir+json";
IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement(); IdType sub1id = createSubscription("Observation?", payload, ourListenerServerBase).getIdElement();
IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement(); IdType sub2id = createSubscription("Patient?", payload, ourListenerServerBase).getIdElement();
@ -216,6 +220,8 @@ public class SubscriptionTriggeringDstu3Test extends BaseResourceProviderDstu3Te
responseValue = response.getParameter().get(0).getValue().primitiveValue(); responseValue = response.getParameter().get(0).getValue().primitiveValue();
assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID")); assertThat(responseValue, containsString("Subscription triggering job submitted as JOB ID"));
// Thread.sleep(1000000000);
waitForSize(51, ourUpdatedObservations); waitForSize(51, ourUpdatedObservations);
waitForSize(0, ourCreatedObservations); waitForSize(0, ourCreatedObservations);
waitForSize(0, ourCreatedPatients); waitForSize(0, ourCreatedPatients);