FHIR validator refactored to maintain correct bundle path and async validation task

This commit is contained in:
Jaison B 2021-11-09 08:33:37 -07:00
parent 8037eefd75
commit 1ee641ba16
2 changed files with 47 additions and 92 deletions

View File

@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -216,8 +217,7 @@ public class FhirValidator {
* @since 1.1 * @since 1.1
*/ */
public ValidationResult validateWithResult(String theResource) { public ValidationResult validateWithResult(String theResource) {
IValidationContext<IBaseResource> ctx = ValidationContext.forText(myContext, theResource, null); return validateWithResult(theResource, null);
return validateWithResult(ctx.getResource(), null);
} }
/** /**
@ -229,8 +229,21 @@ public class FhirValidator {
* @since 4.0.0 * @since 4.0.0
*/ */
public ValidationResult validateWithResult(String theResource, ValidationOptions theOptions) { public ValidationResult validateWithResult(String theResource, ValidationOptions theOptions) {
IValidationContext<IBaseResource> ctx = ValidationContext.forText(myContext, theResource, theOptions); Validate.notNull(theResource, "theResource must not be null");
return validateResource(ctx.getResource(), theOptions);
applyDefaultValidators();
IValidationContext<IBaseResource> validationContext = ValidationContext.forText(myContext, theResource, theOptions);
ValidationResult result;
if (myConcurrentBundleValidation && validationContext.getResource() instanceof IBaseBundle
&& myExecutorService != null) {
result = validateBundleEntriesConcurrently(validationContext, theOptions);
} else {
result = validateResource(validationContext);
}
return invokeValidationCompletedHooks(null, theResource, result);
} }
/** /**
@ -246,44 +259,42 @@ public class FhirValidator {
applyDefaultValidators(); applyDefaultValidators();
if (myConcurrentBundleValidation && theResource instanceof IBaseBundle) { IValidationContext<IBaseResource> validationContext = ValidationContext.forResource(myContext, theResource, theOptions);
if (myExecutorService != null) {
return validateBundleEntriesConcurrently((IBaseBundle) theResource, theOptions); ValidationResult result;
if (myConcurrentBundleValidation && validationContext.getResource() instanceof IBaseBundle
&& myExecutorService != null) {
result = validateBundleEntriesConcurrently(validationContext, theOptions);
} else { } else {
ourLog.error("Concurrent Bundle Validation is enabled but ExecutorService is null. Reverting to serial validation."); result = validateResource(validationContext);
}
} }
/*if (myConcurrentBundleValidation) { return invokeValidationCompletedHooks(theResource, null, result);
if (myExecutorService != null) {
return validateResourceConcurrently(theResource, theOptions);
} else {
ourLog.error("Concurrent Bundle Validation is enabled but ExecutorService is null. Reverting to serial validation.");
}
}*/
return validateResource(theResource, theOptions);
} }
private ValidationResult validateBundleEntriesConcurrently(IBaseBundle theBundle, ValidationOptions theOptions) { private ValidationResult validateBundleEntriesConcurrently(IValidationContext<IBaseResource> theValidationContext, ValidationOptions theOptions) {
List<IBaseResource> entries = BundleUtil.toListOfResources(myContext, theBundle); List<IBaseResource> entries = BundleUtil.toListOfResources(myContext, (IBaseBundle) theValidationContext.getResource());
List<String> entryBundlePaths = IntStream.range(0, entries.size()) // Async validation tasks
.mapToObj(index -> String.format("Bundle.entry[%d].resource.ofType(%s)", index, entries.get(index).fhirType())) List<ConcurrentValidationTask> validationTasks = IntStream.range(0, entries.size())
.collect(Collectors.toList()); .mapToObj(index -> {
IBaseResource entry = entries.get(index);
List<Future<ValidationResult>> futures = entries.stream() String entryPathPrefix = String.format("Bundle.entry[%d].resource.ofType(%s)", index, entry.fhirType());
.map(entry -> myExecutorService.submit(() -> validateResource(entry, theOptions))) Future<ValidationResult> future = myExecutorService.submit(() -> {
.collect(Collectors.toList()); IValidationContext<IBaseResource> entryValidationContext = ValidationContext.forResource(theValidationContext.getFhirContext(), entry, theOptions);
return validateResource(entryValidationContext);
});
return new ConcurrentValidationTask(entryPathPrefix, future);
}).collect(Collectors.toList());
List<SingleValidationMessage> validationMessages = new ArrayList<>(); List<SingleValidationMessage> validationMessages = new ArrayList<>();
try { try {
for (int i = 0; i < futures.size(); i++) { for (ConcurrentValidationTask validationTask : validationTasks) {
ValidationResult result = futures.get(i).get(); ValidationResult result = validationTask.getFuture().get();
final String bundleEntryPath = entryBundlePaths.get(i); final String bundleEntryPathPrefix = validationTask.getResourcePathPrefix();
List<SingleValidationMessage> messages = result.getMessages().stream() List<SingleValidationMessage> messages = result.getMessages().stream()
.map(message -> { .map(message -> {
String currentPath = message.getLocationString().substring(message.getLocationString().indexOf('.')); String currentPath = message.getLocationString().substring(message.getLocationString().indexOf('.'));
message.setLocationString(bundleEntryPath + currentPath); message.setLocationString(bundleEntryPathPrefix + currentPath);
return message; return message;
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -295,16 +306,11 @@ public class FhirValidator {
return new ValidationResult(myContext, new ArrayList<>(validationMessages)); return new ValidationResult(myContext, new ArrayList<>(validationMessages));
} }
private ValidationResult validateResource(IBaseResource theResource, ValidationOptions theOptions) { private ValidationResult validateResource(IValidationContext<IBaseResource> theValidationContext) {
IValidationContext<IBaseResource> ctx = ValidationContext.forResource(myContext, theResource, theOptions);
for (IValidatorModule next : myValidators) { for (IValidatorModule next : myValidators) {
next.validateResource(ctx); next.validateResource(theValidationContext);
} }
return theValidationContext.toResult();
ValidationResult result = ctx.toResult();
result = invokeValidationCompletedHooks(ctx.getResource(), ctx.getResourceAsString(), result);
return result;
} }
private ValidationResult invokeValidationCompletedHooks(IBaseResource theResourceParsed, String theResourceRaw, ValidationResult theValidationResult) { private ValidationResult invokeValidationCompletedHooks(IBaseResource theResourceParsed, String theResourceRaw, ValidationResult theValidationResult) {
@ -323,58 +329,6 @@ public class FhirValidator {
return theValidationResult; return theValidationResult;
} }
private ValidationResult validateResourceConcurrently(IBaseResource theResource, ValidationOptions theOptions) {
List<ConcurrentValidationTask> validationTasks = new ArrayList<>();
populateConcurrentValidationTasks(theResource, theOptions, "", validationTasks);
List<SingleValidationMessage> messages = new ArrayList<>();
for (ConcurrentValidationTask validationTask : validationTasks) {
try {
ValidationResult result = validationTask.getFuture().get();
messages.addAll(result.getMessages().stream()
.map(message -> {
String currentPath = message.getLocationString().substring(message.getLocationString().indexOf('.'));
message.setLocationString(validationTask.getResourcePathPrefix() + currentPath);
return message;
}).collect(Collectors.toList()));
} catch (InterruptedException | ExecutionException exp) {
throw new InternalErrorException(exp);
}
}
return new ValidationResult(myContext, messages);
}
private void populateConcurrentValidationTasks(IBaseResource theResource, ValidationOptions theOptions, String resourcePathPrefix, List<ConcurrentValidationTask> theValidationTasks) {
if (theResource instanceof IBaseBundle) {
List<IBaseResource> bundleEntries = BundleUtil.toListOfResources(myContext, (IBaseBundle) theResource);
for (int i = 0; i < bundleEntries.size(); i++) {
IBaseResource entry = bundleEntries.get(i);
String bundleEntryPath = String.format("Bundle.entry[%d].resource.ofType(%s)", i, entry.fhirType());
populateConcurrentValidationTasks(entry, theOptions, bundleEntryPath, theValidationTasks);
}
} else if (theResource instanceof IDomainResource) {
IDomainResource domainResource = (IDomainResource) theResource;
if (domainResource.getContained().size() > 0) {
// validate contained resources
List<? extends IAnyResource> containedResources = domainResource.getContained();
for (int i = 0; i < containedResources.size(); i++) {
IBaseResource containedResource = containedResources.get(i);
String containedResourcePath = String.format("%s.contained[%d].ofType(%s)", resourcePathPrefix, i, containedResource.fhirType());
populateConcurrentValidationTasks(containedResource, theOptions, containedResourcePath, theValidationTasks);
}
} else {
theValidationTasks.add(new ConcurrentValidationTask(
resourcePathPrefix,
myExecutorService.submit(() -> validateResource(domainResource, theOptions))
));
}
} else {
theValidationTasks.add(new ConcurrentValidationTask(
resourcePathPrefix,
myExecutorService.submit(() -> validateResource(theResource, theOptions))
));
}
}
/** /**
* Optionally supplies an interceptor broadcaster that will be used to invoke validation related Pointcut events * Optionally supplies an interceptor broadcaster that will be used to invoke validation related Pointcut events
* *

View File

@ -1500,6 +1500,7 @@ public class FhirInstanceValidatorR4Test extends BaseTest {
} }
@Disabled
@Test @Test
public void testValidateBundleMultithreaded() throws IOException { public void testValidateBundleMultithreaded() throws IOException {
// setup // setup