Parallize subscription triggering

This commit is contained in:
James Agnew 2023-04-18 16:39:45 -04:00
parent cae75767c5
commit ffedab5524
1 changed files with 129 additions and 118 deletions

View File

@ -52,6 +52,7 @@ import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.UrlUtil;
import ca.uhn.fhir.util.ValidateUtil;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@ -218,12 +219,11 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
// Submit individual resources
AtomicInteger totalSubmitted = new AtomicInteger(0);
List<Pair<String, Future<Void>>> futures = new ArrayList<>();
List<Future<?>> futures = new ArrayList<>();
while (theJobDetails.getRemainingResourceIds().size() > 0 && totalSubmitted.get() < myMaxSubmitPerPass) {
totalSubmitted.incrementAndGet();
String nextResourceId = theJobDetails.getRemainingResourceIds().remove(0);
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
futures.add(Pair.of(nextResourceId, future));
submitResource(theJobDetails.getSubscriptionId(), nextResourceId);
}
// Make sure these all succeeded in submitting
@ -266,55 +266,127 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
// processing step for synchronous processing mode
if (isNotBlank(theJobDetails.getCurrentSearchUrl()) && totalSubmitted.get() < myMaxSubmitPerPass) {
List<IBaseResource> allCurrentResources;
processSynchronous(theJobDetails, totalSubmitted, futures, search);
}
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
// processing step for asynchronous processing mode
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < myMaxSubmitPerPass) {
processAsynchronous(theJobDetails, totalSubmitted, futures);
}
String searchUrl = theJobDetails.getCurrentSearchUrl();
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted.get(), TimeUnit.SECONDS));
}
ourLog.info("Triggered job [{}] - Starting synchronous processing at offset {} and index {}", theJobDetails.getJobId(), theJobDetails.getCurrentOffset(), fromIndex);
private void processAsynchronous(SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List<Future<?>> futures) {
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
int submittableCount = myMaxSubmitPerPass - totalSubmitted.get();
int toIndex = fromIndex + submittableCount;
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
if (nonNull(search) && !search.isEmpty()) {
int maxQuerySize = myMaxSubmitPerPass - totalSubmitted.get();
int toIndex;
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount());
} else {
toIndex = fromIndex + maxQuerySize;
}
// we already have data from the initial step so process as much as we can.
ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex);
allCurrentResources = search.getResources(0, toIndex);
ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
} else {
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
List<IResourcePersistentId<?>> allResourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null);
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), allResourceIds.size());
AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
List<List<IResourcePersistentId<?>>> partitions = Lists.partition(allResourceIds, 100);
for (List<IResourcePersistentId<?>> resourceIds : partitions) {
Runnable job = () -> {
String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType());
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType());
ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder(resourceDao, resourceType, resourceDef.getImplementingClass());
List<IBaseResource> listToPopulate = new ArrayList<>();
myTransactionService
.withRequest(null)
.execute(() -> {
searchBuilder.loadResourcesByPid(resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails());
});
for (IBaseResource nextResource : listToPopulate) {
submitResource(theJobDetails.getSubscriptionId(), nextResource);
totalSubmitted.incrementAndGet();
highestIndexSubmitted.incrementAndGet();
}
RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl);
String queryPart = searchUrl.substring(searchUrl.indexOf('?'));
SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
int offset = theJobDetails.getCurrentOffset() + fromIndex;
params.setOffset(offset);
params.setCount(toIndex);
};
ourLog.info("Triggered job[{}] requesting {} resources from offset {}", theJobDetails.getJobId(), toIndex, offset);
Future<?> future = myExecutorService.submit(job);
futures.add(future);
}
search = mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions());
allCurrentResources = search.getAllResources();
if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get());
if (allResourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.setCurrentSearchUuid(null);
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null);
}
}
}
private void processSynchronous(SubscriptionTriggeringJobDetails theJobDetails, AtomicInteger totalSubmitted, List<Future<?>> futures, IBundleProvider search) {
List<IBaseResource> allCurrentResources;
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
String searchUrl = theJobDetails.getCurrentSearchUrl();
ourLog.info("Triggered job [{}] - Starting synchronous processing at offset {} and index {}", theJobDetails.getJobId(), theJobDetails.getCurrentOffset(), fromIndex);
int submittableCount = myMaxSubmitPerPass - totalSubmitted.get();
int toIndex = fromIndex + submittableCount;
if (nonNull(search) && !search.isEmpty()) {
// we already have data from the initial step so process as much as we can.
ourLog.info("Triggered job[{}] will process up to {} resources", theJobDetails.getJobId(), toIndex);
allCurrentResources = search.getResources(0, toIndex);
} else {
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(toIndex, theJobDetails.getCurrentSearchCount());
}
ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size());
AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
RuntimeResourceDefinition resourceDef = UrlUtil.parseUrlResourceType(myFhirContext, searchUrl);
String queryPart = searchUrl.substring(searchUrl.indexOf('?'));
SearchParameterMap params = myMatchUrlService.translateMatchUrl(queryPart, resourceDef);
int offset = theJobDetails.getCurrentOffset() + fromIndex;
params.setOffset(offset);
params.setCount(toIndex);
ourLog.info("Triggered job[{}] requesting {} resources from offset {}", theJobDetails.getJobId(), toIndex, offset);
search = mySearchService.executeQuery(resourceDef.getName(), params, RequestPartitionId.allPartitions());
allCurrentResources = search.getAllResources();
}
ourLog.info("Triggered job[{}] delivering {} resources", theJobDetails.getJobId(), allCurrentResources.size());
AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
for (IBaseResource nextResource : allCurrentResources) {
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
totalSubmitted.incrementAndGet();
highestIndexSubmitted.incrementAndGet();
}
for (IBaseResource nextResource : allCurrentResources) {
Future<?> future = myExecutorService.submit(()->submitResource(theJobDetails.getSubscriptionId(), nextResource));
futures.add(future);
totalSubmitted.incrementAndGet();
highestIndexSubmitted.incrementAndGet();
}
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
return;
}
if (!validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get());
@ -329,64 +401,6 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
}
}
// processing step for asynchronous processing mode
if (isNotBlank(theJobDetails.getCurrentSearchUuid()) && totalSubmitted.get() < myMaxSubmitPerPass) {
int fromIndex = theJobDetails.getCurrentSearchLastUploadedIndex() + 1;
IFhirResourceDao<?> resourceDao = myDaoRegistry.getResourceDao(theJobDetails.getCurrentSearchResourceType());
int maxQuerySize = myMaxSubmitPerPass - totalSubmitted.get();
int toIndex;
if (theJobDetails.getCurrentSearchCount() != null) {
toIndex = Math.min(fromIndex + maxQuerySize, theJobDetails.getCurrentSearchCount());
} else {
toIndex = fromIndex + maxQuerySize;
}
ourLog.info("Triggering job[{}] search {} requesting resources {} - {}", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex);
List<IResourcePersistentId<?>> resourceIds;
resourceIds = mySearchCoordinatorSvc.getResources(theJobDetails.getCurrentSearchUuid(), fromIndex, toIndex, null);
ourLog.info("Triggering job[{}] delivering {} resources", theJobDetails.getJobId(), resourceIds.size());
AtomicInteger highestIndexSubmitted = new AtomicInteger(theJobDetails.getCurrentSearchLastUploadedIndex());
String resourceType = myFhirContext.getResourceType(theJobDetails.getCurrentSearchResourceType());
RuntimeResourceDefinition resourceDef = myFhirContext.getResourceDefinition(theJobDetails.getCurrentSearchResourceType());
ISearchBuilder searchBuilder = mySearchBuilderFactory.newSearchBuilder(resourceDao, resourceType, resourceDef.getImplementingClass());
List<IBaseResource> listToPopulate = new ArrayList<>();
myTransactionService
.withRequest(null)
.execute(() -> {
searchBuilder.loadResourcesByPid(resourceIds, Collections.emptyList(), listToPopulate, false, new SystemRequestDetails());
});
for (IBaseResource nextResource : listToPopulate) {
Future<Void> future = submitResource(theJobDetails.getSubscriptionId(), nextResource);
futures.add(Pair.of(nextResource.getIdElement().getIdPart(), future));
totalSubmitted.incrementAndGet();
highestIndexSubmitted.incrementAndGet();
}
if (validateFuturesAndReturnTrueIfWeShouldAbort(futures)) {
return;
}
theJobDetails.setCurrentSearchLastUploadedIndex(highestIndexSubmitted.get());
if (resourceIds.size() == 0 || (theJobDetails.getCurrentSearchCount() != null && toIndex >= theJobDetails.getCurrentSearchCount())) {
ourLog.info("Triggering job[{}] search {} has completed ", theJobDetails.getJobId(), theJobDetails.getCurrentSearchUuid());
theJobDetails.setCurrentSearchResourceType(null);
theJobDetails.setCurrentSearchUuid(null);
theJobDetails.setCurrentSearchLastUploadedIndex(-1);
theJobDetails.setCurrentSearchCount(null);
}
}
ourLog.info("Subscription trigger job[{}] triggered {} resources in {}ms ({} res / second)", theJobDetails.getJobId(), totalSubmitted, sw.getMillis(), sw.getThroughput(totalSubmitted.get(), TimeUnit.SECONDS));
}
private boolean isInitialStep(SubscriptionTriggeringJobDetails theJobDetails) {
@ -397,57 +411,54 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
return isInitialStep(theJobDetails);
}
private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Pair<String, Future<Void>>> theIdToFutures) {
private boolean validateFuturesAndReturnTrueIfWeShouldAbort(List<Future<?>> theFutures) {
for (Pair<String, Future<Void>> next : theIdToFutures) {
String nextDeliveredId = next.getKey();
for (Future<?> nextFuture : theFutures) {
try {
Future<Void> nextFuture = next.getValue();
nextFuture.get();
ourLog.info("Finished redelivering {}", nextDeliveredId);
} catch (Exception e) {
ourLog.error("Failure triggering resource " + nextDeliveredId, e);
ourLog.error("Failure triggering resource", e);
return true;
}
}
// Clear the list since it will potentially get reused
theIdToFutures.clear();
theFutures.clear();
return false;
}
private Future<Void> submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
private void submitResource(String theSubscriptionId, String theResourceIdToTrigger) {
org.hl7.fhir.r4.model.IdType resourceId = new org.hl7.fhir.r4.model.IdType(theResourceIdToTrigger);
IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceId.getResourceType());
IBaseResource resourceToTrigger = dao.read(resourceId, SystemRequestDetails.forAllPartitions());
return submitResource(theSubscriptionId, resourceToTrigger);
submitResource(theSubscriptionId, resourceToTrigger);
}
private Future<Void> submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
private void submitResource(String theSubscriptionId, IBaseResource theResourceToTrigger) {
ourLog.info("Submitting resource {} to subscription {}", theResourceToTrigger.getIdElement().toUnqualifiedVersionless().getValue(), theSubscriptionId);
ResourceModifiedMessage msg = new ResourceModifiedMessage(myFhirContext, theResourceToTrigger, ResourceModifiedMessage.OperationTypeEnum.UPDATE);
msg.setSubscriptionId(theSubscriptionId);
return myExecutorService.submit(() -> {
for (int i = 0; ; i++) {
try {
myResourceModifiedConsumer.submitResourceModified(msg);
break;
} catch (Exception e) {
if (i >= 3) {
throw new InternalErrorException(Msg.code(25) + e);
}
for (int i = 0; ; i++) {
try {
myResourceModifiedConsumer.submitResourceModified(msg);
break;
} catch (Exception e) {
if (i >= 3) {
throw new InternalErrorException(Msg.code(25) + e);
}
ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
ourLog.warn("Exception while retriggering subscriptions (going to sleep and retry): {}", e.toString());
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
return null;
});
}
}
@ -499,7 +510,7 @@ public class SubscriptionTriggeringSvcImpl implements ISubscriptionTriggeringSvc
};
myExecutorService = new ThreadPoolExecutor(
0,
10,
20,
0L,
TimeUnit.MILLISECONDS,
executorQueue,