Merge branch 'issue-3144-multi-threaded-bundle-validation' of github.com:hapifhir/hapi-fhir into issue-3144-multi-threaded-bundle-validation

This commit is contained in:
Ken Stevens 2021-11-08 19:30:30 -05:00
commit 8037eefd75
2 changed files with 119 additions and 43 deletions

View File

@ -27,18 +27,23 @@ import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.BundleUtil; import ca.uhn.fhir.util.BundleUtil;
import ca.uhn.fhir.validation.schematron.SchematronProvider; import ca.uhn.fhir.validation.schematron.SchematronProvider;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseBundle; import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IDomainResource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream;
/** /**
@ -84,8 +89,7 @@ public class FhirValidator {
registerValidatorModule(theInstance); registerValidatorModule(theInstance);
} }
} else { } else {
for (Iterator<IValidatorModule> iter = myValidators.iterator(); iter.hasNext(); ) { for (IValidatorModule next : myValidators) {
IValidatorModule next = iter.next();
if (next.getClass().equals(type)) { if (next.getClass().equals(type)) {
unregisterValidatorModule(next); unregisterValidatorModule(next);
} }
@ -98,6 +102,7 @@ public class FhirValidator {
for (IValidatorModule next : myValidators) { for (IValidatorModule next : myValidators) {
if (next.getClass().equals(type)) { if (next.getClass().equals(type)) {
found = true; found = true;
break;
} }
} }
return found; return found;
@ -211,7 +216,21 @@ public class FhirValidator {
* @since 1.1 * @since 1.1
*/ */
public ValidationResult validateWithResult(String theResource) { public ValidationResult validateWithResult(String theResource) {
return validateWithResult(theResource, null); IValidationContext<IBaseResource> ctx = ValidationContext.forText(myContext, theResource, null);
return validateWithResult(ctx.getResource(), null);
}
/**
* Validates a resource instance returning a {@link ValidationResult} which contains the results.
*
* @param theResource the resource to validate
* @param theOptions Optionally provides options to the validator
* @return the results of validation
* @since 4.0.0
*/
public ValidationResult validateWithResult(String theResource, ValidationOptions theOptions) {
IValidationContext<IBaseResource> ctx = ValidationContext.forText(myContext, theResource, theOptions);
return validateResource(ctx.getResource(), theOptions);
} }
/** /**
@ -235,28 +254,45 @@ public class FhirValidator {
} }
} }
/*if (myConcurrentBundleValidation) {
if (myExecutorService != null) {
return validateResourceConcurrently(theResource, theOptions);
} else {
ourLog.error("Concurrent Bundle Validation is enabled but ExecutorService is null. Reverting to serial validation.");
}
}*/
return validateResource(theResource, theOptions); return validateResource(theResource, theOptions);
} }
private ValidationResult validateBundleEntriesConcurrently(IBaseBundle theBundle, ValidationOptions theOptions) { private ValidationResult validateBundleEntriesConcurrently(IBaseBundle theBundle, ValidationOptions theOptions) {
List<IBaseResource> entries = BundleUtil.toListOfResources(myContext, theBundle); List<IBaseResource> entries = BundleUtil.toListOfResources(myContext, theBundle);
List<String> entryBundlePaths = IntStream.range(0, entries.size())
.mapToObj(index -> String.format("Bundle.entry[%d].resource.ofType(%s)", index, entries.get(index).fhirType()))
.collect(Collectors.toList());
List<Future<ValidationResult>> futures = new ArrayList<>(); List<Future<ValidationResult>> futures = entries.stream()
for (IBaseResource entry : entries) { .map(entry -> myExecutorService.submit(() -> validateResource(entry, theOptions)))
futures.add(myExecutorService.submit(() -> validateResource(entry, theOptions))); .collect(Collectors.toList());
}
List<SingleValidationMessage> validationMessages = new ArrayList<>(); List<SingleValidationMessage> validationMessages = new ArrayList<>();
try { try {
for (Future<ValidationResult> future : futures) { for (int i = 0; i < futures.size(); i++) {
ValidationResult result = future.get(); ValidationResult result = futures.get(i).get();
// FIXME JB prepend bundle entry details so we know which entry has the errors final String bundleEntryPath = entryBundlePaths.get(i);
validationMessages.addAll(result.getMessages()); List<SingleValidationMessage> messages = result.getMessages().stream()
.map(message -> {
String currentPath = message.getLocationString().substring(message.getLocationString().indexOf('.'));
message.setLocationString(bundleEntryPath + currentPath);
return message;
})
.collect(Collectors.toList());
validationMessages.addAll(messages);
} }
} catch (Exception e) { } catch (InterruptedException | ExecutionException exp) {
throw new InternalErrorException(e); throw new InternalErrorException(exp);
} }
return new ValidationResult(myContext, validationMessages.stream().collect(Collectors.toList())); return new ValidationResult(myContext, new ArrayList<>(validationMessages));
} }
private ValidationResult validateResource(IBaseResource theResource, ValidationOptions theOptions) { private ValidationResult validateResource(IBaseResource theResource, ValidationOptions theOptions) {
@ -267,7 +303,7 @@ public class FhirValidator {
} }
ValidationResult result = ctx.toResult(); ValidationResult result = ctx.toResult();
result = invokeValidationCompletedHooks(theResource, null, result); result = invokeValidationCompletedHooks(ctx.getResource(), ctx.getResourceAsString(), result);
return result; return result;
} }
@ -287,37 +323,56 @@ public class FhirValidator {
return theValidationResult; return theValidationResult;
} }
/** private ValidationResult validateResourceConcurrently(IBaseResource theResource, ValidationOptions theOptions) {
* Validates a resource instance returning a {@link ValidationResult} which contains the results. List<ConcurrentValidationTask> validationTasks = new ArrayList<>();
* populateConcurrentValidationTasks(theResource, theOptions, "", validationTasks);
* @param theResource the resource to validate List<SingleValidationMessage> messages = new ArrayList<>();
* @param theOptions Optionally provides options to the validator for (ConcurrentValidationTask validationTask : validationTasks) {
* @return the results of validation try {
* @since 4.0.0 ValidationResult result = validationTask.getFuture().get();
*/ messages.addAll(result.getMessages().stream()
// FIXME JB consolidate this method with the other one that also calls applyDefaultValidators() .map(message -> {
public ValidationResult validateWithResult(String theResource, ValidationOptions theOptions) { String currentPath = message.getLocationString().substring(message.getLocationString().indexOf('.'));
Validate.notNull(theResource, "theResource must not be null"); message.setLocationString(validationTask.getResourcePathPrefix() + currentPath);
return message;
applyDefaultValidators(); }).collect(Collectors.toList()));
} catch (InterruptedException | ExecutionException exp) {
IValidationContext<IBaseResource> ctx = ValidationContext.forText(myContext, theResource, theOptions); throw new InternalErrorException(exp);
if (myConcurrentBundleValidation && ctx.getResource() instanceof IBaseBundle) {
if (myExecutorService != null) {
return validateBundleEntriesConcurrently((IBaseBundle) ctx.getResource(), theOptions);
} else {
ourLog.error("Concurrent Bundle Validation is enabled but ExecutorService is null. Reverting to serial validation.");
} }
} }
return new ValidationResult(myContext, messages);
}
for (IValidatorModule next : myValidators) { private void populateConcurrentValidationTasks(IBaseResource theResource, ValidationOptions theOptions, String resourcePathPrefix, List<ConcurrentValidationTask> theValidationTasks) {
next.validateResource(ctx); if (theResource instanceof IBaseBundle) {
List<IBaseResource> bundleEntries = BundleUtil.toListOfResources(myContext, (IBaseBundle) theResource);
for (int i = 0; i < bundleEntries.size(); i++) {
IBaseResource entry = bundleEntries.get(i);
String bundleEntryPath = String.format("Bundle.entry[%d].resource.ofType(%s)", i, entry.fhirType());
populateConcurrentValidationTasks(entry, theOptions, bundleEntryPath, theValidationTasks);
}
} else if (theResource instanceof IDomainResource) {
IDomainResource domainResource = (IDomainResource) theResource;
if (domainResource.getContained().size() > 0) {
// validate contained resources
List<? extends IAnyResource> containedResources = domainResource.getContained();
for (int i = 0; i < containedResources.size(); i++) {
IBaseResource containedResource = containedResources.get(i);
String containedResourcePath = String.format("%s.contained[%d].ofType(%s)", resourcePathPrefix, i, containedResource.fhirType());
populateConcurrentValidationTasks(containedResource, theOptions, containedResourcePath, theValidationTasks);
}
} else {
theValidationTasks.add(new ConcurrentValidationTask(
resourcePathPrefix,
myExecutorService.submit(() -> validateResource(domainResource, theOptions))
));
}
} else {
theValidationTasks.add(new ConcurrentValidationTask(
resourcePathPrefix,
myExecutorService.submit(() -> validateResource(theResource, theOptions))
));
} }
ValidationResult result = ctx.toResult();
result = invokeValidationCompletedHooks(null, theResource, result);
return result;
} }
/** /**
@ -351,4 +406,22 @@ public class FhirValidator {
myConcurrentBundleValidation = theConcurrentBundleValidation; myConcurrentBundleValidation = theConcurrentBundleValidation;
return this; return this;
} }
private static class ConcurrentValidationTask {
private final String myResourcePathPrefix;
private final Future<ValidationResult> myFuture;
private ConcurrentValidationTask(String theResourcePathPrefix, Future<ValidationResult> theFuture) {
myResourcePathPrefix = theResourcePathPrefix;
myFuture = theFuture;
}
public String getResourcePathPrefix() {
return myResourcePathPrefix;
}
public Future<ValidationResult> getFuture() {
return myFuture;
}
}
} }

View File

@ -1524,6 +1524,9 @@ public class FhirInstanceValidatorR4Test extends BaseTest {
StopWatch stopwatch = new StopWatch(); StopWatch stopwatch = new StopWatch();
ValidationResult output = myFhirValidator.validateWithResult(bundle); ValidationResult output = myFhirValidator.validateWithResult(bundle);
ourLog.info("Validation time: {}", stopwatch); ourLog.info("Validation time: {}", stopwatch);
// assert that validation messages include the bundle entry path
assertTrue(output.getMessages().stream().anyMatch(message -> message.getLocationString().contains("Bundle.entry[0].resource.ofType(Patient)")));
assertTrue(output.getMessages().stream().anyMatch(message -> message.getLocationString().contains("Bundle.entry[1].resource.ofType(Patient)")));
// validate // validate
List<SingleValidationMessage> all = logResultsAndReturnErrorOnes(output); List<SingleValidationMessage> all = logResultsAndReturnErrorOnes(output);
assertThat(output.getMessages(), hasSize(entriesCount * 2)); assertThat(output.getMessages(), hasSize(entriesCount * 2));