Threaded validation of contained resources initial checkin

This commit is contained in:
Jaison B 2021-11-08 17:20:53 -07:00
parent 4cab7f8b80
commit f47cfec3d0
1 changed files with 82 additions and 0 deletions

View File

@ -27,9 +27,12 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.BundleUtil; import ca.uhn.fhir.util.BundleUtil;
import ca.uhn.fhir.validation.schematron.SchematronProvider; import ca.uhn.fhir.validation.schematron.SchematronProvider;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseBundle; import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IDomainResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,6 +41,7 @@ import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -250,6 +254,14 @@ public class FhirValidator {
} }
} }
/*if (myConcurrentBundleValidation) {
if (myExecutorService != null) {
return validateResourceConcurrently(theResource, theOptions);
} else {
ourLog.error("Concurrent Bundle Validation is enabled but ExecutorService is null. Reverting to serial validation.");
}
}*/
return validateResource(theResource, theOptions); return validateResource(theResource, theOptions);
} }
@ -311,6 +323,58 @@ public class FhirValidator {
return theValidationResult; return theValidationResult;
} }
private ValidationResult validateResourceConcurrently(IBaseResource theResource, ValidationOptions theOptions) {
List<ConcurrentValidationTask> validationTasks = new ArrayList<>();
populateConcurrentValidationTasks(theResource, theOptions, "", validationTasks);
List<SingleValidationMessage> messages = new ArrayList<>();
for (ConcurrentValidationTask validationTask : validationTasks) {
try {
ValidationResult result = validationTask.getFuture().get();
messages.addAll(result.getMessages().stream()
.map(message -> {
String currentPath = message.getLocationString().substring(message.getLocationString().indexOf('.'));
message.setLocationString(validationTask.getResourcePathPrefix() + currentPath);
return message;
}).collect(Collectors.toList()));
} catch (InterruptedException | ExecutionException exp) {
throw new InternalErrorException(exp);
}
}
return new ValidationResult(myContext, messages);
}
private void populateConcurrentValidationTasks(IBaseResource theResource, ValidationOptions theOptions, String resourcePathPrefix, List<ConcurrentValidationTask> theValidationTasks) {
if (theResource instanceof IBaseBundle) {
List<IBaseResource> bundleEntries = BundleUtil.toListOfResources(myContext, (IBaseBundle) theResource);
for (int i = 0; i < bundleEntries.size(); i++) {
IBaseResource entry = bundleEntries.get(i);
String bundleEntryPath = String.format("Bundle.entry[%d].resource.ofType(%s)", i, entry.fhirType());
populateConcurrentValidationTasks(entry, theOptions, bundleEntryPath, theValidationTasks);
}
} else if (theResource instanceof IDomainResource) {
IDomainResource domainResource = (IDomainResource) theResource;
if (domainResource.getContained().size() > 0) {
// validate contained resources
List<? extends IAnyResource> containedResources = domainResource.getContained();
for (int i = 0; i < containedResources.size(); i++) {
IBaseResource containedResource = containedResources.get(i);
String containedResourcePath = String.format("%s.contained[%d].ofType(%s)", resourcePathPrefix, i, containedResource.fhirType());
populateConcurrentValidationTasks(containedResource, theOptions, containedResourcePath, theValidationTasks);
}
} else {
theValidationTasks.add(new ConcurrentValidationTask(
resourcePathPrefix,
myExecutorService.submit(() -> validateResource(domainResource, theOptions))
));
}
} else {
theValidationTasks.add(new ConcurrentValidationTask(
resourcePathPrefix,
myExecutorService.submit(() -> validateResource(theResource, theOptions))
));
}
}
/** /**
* Optionally supplies an interceptor broadcaster that will be used to invoke validation related Pointcut events * Optionally supplies an interceptor broadcaster that will be used to invoke validation related Pointcut events
* *
@ -342,4 +406,22 @@ public class FhirValidator {
myConcurrentBundleValidation = theConcurrentBundleValidation; myConcurrentBundleValidation = theConcurrentBundleValidation;
return this; return this;
} }
private static class ConcurrentValidationTask {
private final String myResourcePathPrefix;
private final Future<ValidationResult> myFuture;
private ConcurrentValidationTask(String theResourcePathPrefix, Future<ValidationResult> theFuture) {
myResourcePathPrefix = theResourcePathPrefix;
myFuture = theFuture;
}
public String getResourcePathPrefix() {
return myResourcePathPrefix;
}
public Future<ValidationResult> getFuture() {
return myFuture;
}
}
} }