Issue 3419 elastic search exception while pre expanding valueset with more than 10 000 concepts (#3991)
* Add test reproducing bug * Refactor ValueSet expansion hsearch queries to use scroll. * Use void method for never used return value. * Execute action when transaction is active * Update tests query counts as the CodeSystemVersion.pid is always queried now * Enable test after reproduced bug fix. Add changelog. * Implement suggestions Co-authored-by: juan.marchionatto <juan.marchionatto@smilecdr.com>
This commit is contained in:
parent
9b50b332a4
commit
002a7dc670
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
type: fix
|
||||
issue: 3419
|
||||
title: "With Elasticsearch configured, including terminology, an exception was raised while expanding a ValueSet
|
||||
with more than 10,000 concepts. This has now been fixed."
|
|
@ -108,6 +108,8 @@ import org.hibernate.search.engine.search.predicate.dsl.BooleanPredicateClausesS
|
|||
import org.hibernate.search.engine.search.predicate.dsl.PredicateFinalStep;
|
||||
import org.hibernate.search.engine.search.predicate.dsl.SearchPredicateFactory;
|
||||
import org.hibernate.search.engine.search.query.SearchQuery;
|
||||
import org.hibernate.search.engine.search.query.SearchScroll;
|
||||
import org.hibernate.search.engine.search.query.SearchScrollResult;
|
||||
import org.hibernate.search.mapper.orm.Search;
|
||||
import org.hibernate.search.mapper.orm.common.EntityReference;
|
||||
import org.hibernate.search.mapper.orm.session.SearchSession;
|
||||
|
@ -186,9 +188,7 @@ import java.util.Set;
|
|||
import java.util.StringTokenizer;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static ca.uhn.fhir.jpa.term.api.ITermLoaderSvc.LOINC_URI;
|
||||
|
@ -759,32 +759,17 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
// Handle includes
|
||||
ourLog.debug("Handling includes");
|
||||
for (ValueSet.ConceptSetComponent include : theValueSetToExpand.getCompose().getInclude()) {
|
||||
for (int i = 0; ; i++) {
|
||||
int queryIndex = i;
|
||||
Boolean shouldContinue = executeInNewTransactionIfNeeded(() -> {
|
||||
boolean add = true;
|
||||
return expandValueSetHandleIncludeOrExclude(theExpansionOptions, theValueSetCodeAccumulator, addedCodes, include, add, queryIndex, theExpansionFilter);
|
||||
});
|
||||
if (!shouldContinue) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
myTxTemplate.executeWithoutResult(tx ->
|
||||
expandValueSetHandleIncludeOrExclude(theExpansionOptions, theValueSetCodeAccumulator, addedCodes,
|
||||
include, true, theExpansionFilter) );
|
||||
}
|
||||
|
||||
// Handle excludes
|
||||
ourLog.debug("Handling excludes");
|
||||
for (ValueSet.ConceptSetComponent exclude : theValueSetToExpand.getCompose().getExclude()) {
|
||||
for (int i = 0; ; i++) {
|
||||
int queryIndex = i;
|
||||
Boolean shouldContinue = executeInNewTransactionIfNeeded(() -> {
|
||||
boolean add = false;
|
||||
ExpansionFilter expansionFilter = ExpansionFilter.NO_FILTER;
|
||||
return expandValueSetHandleIncludeOrExclude(theExpansionOptions, theValueSetCodeAccumulator, addedCodes, exclude, add, queryIndex, expansionFilter);
|
||||
});
|
||||
if (!shouldContinue) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
myTxTemplate.executeWithoutResult(tx ->
|
||||
expandValueSetHandleIncludeOrExclude(theExpansionOptions, theValueSetCodeAccumulator, addedCodes,
|
||||
exclude, false, ExpansionFilter.NO_FILTER) );
|
||||
}
|
||||
|
||||
if (theValueSetCodeAccumulator instanceof ValueSetConceptAccumulator) {
|
||||
|
@ -794,17 +779,6 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
ourLog.debug("Done working with {} in {}ms", valueSetInfo, sw.getMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute in a new transaction only if we aren't already in one. We do this because in some cases
|
||||
* when performing a VS expansion we throw an {@link ExpansionTooCostlyException} and we don't want
|
||||
* this to cause the TX to be marked a rollback prematurely.
|
||||
*/
|
||||
private <T> T executeInNewTransactionIfNeeded(Supplier<T> theAction) {
|
||||
if (TransactionSynchronizationManager.isSynchronizationActive()) {
|
||||
return theAction.get();
|
||||
}
|
||||
return myTxTemplate.execute(t -> theAction.get());
|
||||
}
|
||||
|
||||
private String getValueSetInfo(ValueSet theValueSet) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -833,7 +807,12 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
/**
|
||||
* @return Returns true if there are potentially more results to process.
|
||||
*/
|
||||
private Boolean expandValueSetHandleIncludeOrExclude(@Nullable ValueSetExpansionOptions theExpansionOptions, IValueSetConceptAccumulator theValueSetCodeAccumulator, Set<String> theAddedCodes, ValueSet.ConceptSetComponent theIncludeOrExclude, boolean theAdd, int theQueryIndex, @Nonnull ExpansionFilter theExpansionFilter) {
|
||||
private void expandValueSetHandleIncludeOrExclude(@Nullable ValueSetExpansionOptions theExpansionOptions,
|
||||
IValueSetConceptAccumulator theValueSetCodeAccumulator,
|
||||
Set<String> theAddedCodes,
|
||||
ValueSet.ConceptSetComponent theIncludeOrExclude,
|
||||
boolean theAdd,
|
||||
@Nonnull ExpansionFilter theExpansionFilter) {
|
||||
|
||||
String system = theIncludeOrExclude.getSystem();
|
||||
boolean hasSystem = isNotBlank(system);
|
||||
|
@ -842,7 +821,7 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
if (hasSystem) {
|
||||
|
||||
if (theExpansionFilter.hasCode() && theExpansionFilter.getSystem() != null && !system.equals(theExpansionFilter.getSystem())) {
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
|
||||
ourLog.debug("Starting {} expansion around CodeSystem: {}", (theAdd ? "inclusion" : "exclusion"), system);
|
||||
|
@ -850,14 +829,16 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
TermCodeSystem cs = myCodeSystemDao.findByCodeSystemUri(system);
|
||||
if (cs != null) {
|
||||
|
||||
return expandValueSetHandleIncludeOrExcludeUsingDatabase(theExpansionOptions, theValueSetCodeAccumulator, theAddedCodes, theIncludeOrExclude, theAdd, theQueryIndex, theExpansionFilter, system, cs);
|
||||
expandValueSetHandleIncludeOrExcludeUsingDatabase(theExpansionOptions, theValueSetCodeAccumulator,
|
||||
theAddedCodes, theIncludeOrExclude, theAdd, theExpansionFilter, system, cs);
|
||||
return;
|
||||
|
||||
} else {
|
||||
|
||||
if (theIncludeOrExclude.getConcept().size() > 0 && theExpansionFilter.hasCode()) {
|
||||
if (defaultString(theIncludeOrExclude.getSystem()).equals(theExpansionFilter.getSystem())) {
|
||||
if (theIncludeOrExclude.getConcept().stream().noneMatch(t -> t.getCode().equals(theExpansionFilter.getCode()))) {
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -872,14 +853,12 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
new InMemoryTerminologyServerValidationSupport(myContext).expandValueSetIncludeOrExclude(new ValidationSupportContext(provideValidationSupport()), consumer, includeOrExclude);
|
||||
} catch (InMemoryTerminologyServerValidationSupport.ExpansionCouldNotBeCompletedInternallyException e) {
|
||||
if (!theExpansionOptions.isFailOnMissingCodeSystem() && e.getFailureType() == InMemoryTerminologyServerValidationSupport.FailureType.UNKNOWN_CODE_SYSTEM) {
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
throw new InternalErrorException(Msg.code(888) + e);
|
||||
} finally {
|
||||
ConversionContext40_50.INSTANCE.close("ValueSet");
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
} else if (hasValueSet) {
|
||||
|
@ -901,8 +880,6 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
} else {
|
||||
throw new InvalidRequestException(Msg.code(890) + "ValueSet contains " + (theAdd ? "include" : "exclude") + " criteria with no system defined");
|
||||
}
|
||||
|
@ -915,103 +892,53 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
}
|
||||
|
||||
@Nonnull
|
||||
private Boolean expandValueSetHandleIncludeOrExcludeUsingDatabase(ValueSetExpansionOptions theExpansionOptions, IValueSetConceptAccumulator theValueSetCodeAccumulator, Set<String> theAddedCodes, ValueSet.ConceptSetComponent theIncludeOrExclude, boolean theAdd, int theQueryIndex, @Nonnull ExpansionFilter theExpansionFilter, String theSystem, TermCodeSystem theCs) {
|
||||
String includeOrExcludeVersion = theIncludeOrExclude.getVersion();
|
||||
TermCodeSystemVersion csv;
|
||||
if (isEmpty(includeOrExcludeVersion)) {
|
||||
csv = theCs.getCurrentVersion();
|
||||
} else {
|
||||
csv = myCodeSystemVersionDao.findByCodeSystemPidAndVersion(theCs.getPid(), includeOrExcludeVersion);
|
||||
}
|
||||
private void expandValueSetHandleIncludeOrExcludeUsingDatabase(
|
||||
ValueSetExpansionOptions theExpansionOptions,
|
||||
IValueSetConceptAccumulator theValueSetCodeAccumulator,
|
||||
Set<String> theAddedCodes,
|
||||
ValueSet.ConceptSetComponent theIncludeOrExclude,
|
||||
boolean theAdd,
|
||||
@Nonnull ExpansionFilter theExpansionFilter,
|
||||
String theSystem,
|
||||
TermCodeSystem theCs) {
|
||||
|
||||
StopWatch fullOperationSw = new StopWatch();
|
||||
|
||||
String includeOrExcludeVersion = theIncludeOrExclude.getVersion();
|
||||
TermCodeSystemVersion termCodeSystemVersion = isEmpty(includeOrExcludeVersion)
|
||||
? theCs.getCurrentVersion()
|
||||
: myCodeSystemVersionDao.findByCodeSystemPidAndVersion(theCs.getPid(), includeOrExcludeVersion);
|
||||
|
||||
SearchSession searchSession = Search.session(myEntityManager);
|
||||
/*
|
||||
* If FullText searching is not enabled, we can handle only basic expansions
|
||||
* since we're going to do it without the database.
|
||||
*/
|
||||
if (!isHibernateSearchEnabled()) {
|
||||
expandWithoutHibernateSearch(theValueSetCodeAccumulator, csv, theAddedCodes, theIncludeOrExclude, theSystem, theAdd);
|
||||
return false;
|
||||
expandWithoutHibernateSearch(theValueSetCodeAccumulator, termCodeSystemVersion, theAddedCodes, theIncludeOrExclude, theSystem, theAdd);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Ok, let's use hibernate search to build the expansion
|
||||
*/
|
||||
//Manually building a predicate since we need to throw it around.
|
||||
SearchPredicateFactory predicate = searchSession.scope(TermConcept.class).predicate();
|
||||
|
||||
//Build the top-level expansion on filters.
|
||||
PredicateFinalStep step = predicate.bool(b -> {
|
||||
b.must(predicate.match().field("myCodeSystemVersionPid").matching(csv.getPid()));
|
||||
int count = 0;
|
||||
|
||||
if (theExpansionFilter.hasCode()) {
|
||||
b.must(predicate.match().field("myCode").matching(theExpansionFilter.getCode()));
|
||||
}
|
||||
Optional<Integer> chunkSizeOpt = getScrollChunkSize(theAdd, theValueSetCodeAccumulator);
|
||||
if (chunkSizeOpt.isEmpty()) { return; }
|
||||
int chunkSize = chunkSizeOpt.get();
|
||||
|
||||
String codeSystemUrlAndVersion = buildCodeSystemUrlAndVersion(theSystem, includeOrExcludeVersion);
|
||||
for (ValueSet.ConceptSetFilterComponent nextFilter : theIncludeOrExclude.getFilter()) {
|
||||
handleFilter(codeSystemUrlAndVersion, predicate, b, nextFilter);
|
||||
}
|
||||
for (ValueSet.ConceptSetFilterComponent nextFilter : theExpansionFilter.getFilters()) {
|
||||
handleFilter(codeSystemUrlAndVersion, predicate, b, nextFilter);
|
||||
}
|
||||
});
|
||||
SearchProperties searchProps = buildSearchScroll(termCodeSystemVersion, theExpansionFilter, theSystem,
|
||||
theIncludeOrExclude, chunkSize, includeOrExcludeVersion);
|
||||
|
||||
List<String> codes = theIncludeOrExclude
|
||||
.getConcept()
|
||||
.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(ValueSet.ConceptReferenceComponent::getCode)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.collect(Collectors.toList());
|
||||
int accumulatedBatchesSoFar = 0;
|
||||
try ( SearchScroll<EntityReference> scroll = searchProps.getSearchScroll() ) {
|
||||
|
||||
Optional<PredicateFinalStep> expansionStepOpt = buildExpansionPredicate(codes, predicate);
|
||||
final PredicateFinalStep finishedQuery = expansionStepOpt.isPresent()
|
||||
? predicate.bool().must(step).must(expansionStepOpt.get()) : step;
|
||||
ourLog.debug("Beginning batch expansion for {} with max results per batch: {}", (theAdd ? "inclusion" : "exclusion"), chunkSize);
|
||||
for ( SearchScrollResult<EntityReference> chunk = scroll.next(); chunk.hasHits(); chunk = scroll.next() ) {
|
||||
int countForBatch = 0;
|
||||
|
||||
/*
|
||||
* DM 2019-08-21 - Processing slows after any ValueSets with many codes explicitly identified. This might
|
||||
* be due to the dark arts that is memory management. Will monitor but not do anything about this right now.
|
||||
*/
|
||||
|
||||
//BooleanQuery.setMaxClauseCount(SearchBuilder.getMaximumPageSize());
|
||||
//TODO GGG HS looks like we can't set max clause count, but it can be set server side.
|
||||
//BooleanQuery.setMaxClauseCount(10000);
|
||||
// JM 2-22-02-15 - Hopefully increasing maxClauseCount should be not needed anymore
|
||||
|
||||
StopWatch sw = new StopWatch();
|
||||
AtomicInteger count = new AtomicInteger(0);
|
||||
|
||||
int maxResultsPerBatch = SearchBuilder.getMaximumPageSize();
|
||||
|
||||
/*
|
||||
* If the accumulator is bounded, we may reduce the size of the query to
|
||||
* Lucene in order to be more efficient.
|
||||
*/
|
||||
if (theAdd) {
|
||||
Integer accumulatorCapacityRemaining = theValueSetCodeAccumulator.getCapacityRemaining();
|
||||
if (accumulatorCapacityRemaining != null) {
|
||||
maxResultsPerBatch = Math.min(maxResultsPerBatch, accumulatorCapacityRemaining + 1);
|
||||
}
|
||||
if (maxResultsPerBatch <= 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
ourLog.debug("Beginning batch expansion for {} with max results per batch: {}", (theAdd ? "inclusion" : "exclusion"), maxResultsPerBatch);
|
||||
|
||||
StopWatch swForBatch = new StopWatch();
|
||||
AtomicInteger countForBatch = new AtomicInteger(0);
|
||||
|
||||
SearchQuery<EntityReference> termConceptsQuery = searchSession
|
||||
.search(TermConcept.class)
|
||||
.selectEntityReference()
|
||||
.where(f -> finishedQuery)
|
||||
.toQuery();
|
||||
|
||||
ourLog.trace("About to query: {}", termConceptsQuery.queryString());
|
||||
List<EntityReference> termConceptRefs = termConceptsQuery.fetchHits(theQueryIndex * maxResultsPerBatch, maxResultsPerBatch);
|
||||
List<Long> pids = termConceptRefs
|
||||
List<Long> pids = chunk.hits()
|
||||
.stream()
|
||||
.map(t -> (Long) t.id())
|
||||
.collect(Collectors.toList());
|
||||
|
@ -1019,25 +946,14 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
List<TermConcept> termConcepts = myTermConceptDao.fetchConceptsAndDesignationsByPid(pids);
|
||||
|
||||
// If the include section had multiple codes, return the codes in the same order
|
||||
if (codes.size() > 1) {
|
||||
termConcepts = new ArrayList<>(termConcepts);
|
||||
Map<String, Integer> codeToIndex = new HashMap<>(codes.size());
|
||||
for (int i = 0; i < codes.size(); i++) {
|
||||
codeToIndex.put(codes.get(i), i);
|
||||
}
|
||||
termConcepts.sort(((o1, o2) -> {
|
||||
Integer idx1 = codeToIndex.get(o1.getCode());
|
||||
Integer idx2 = codeToIndex.get(o2.getCode());
|
||||
return Comparators.nullsHigh().compare(idx1, idx2);
|
||||
}));
|
||||
}
|
||||
termConcepts = sortTermConcepts(searchProps, termConcepts);
|
||||
|
||||
int resultsInBatch = termConcepts.size();
|
||||
int firstResult = theQueryIndex * maxResultsPerBatch;// TODO GGG HS we lose the ability to check the index of the first result, so just best-guessing it here.
|
||||
// int firstResult = theQueryIndex * maxResultsPerBatch;// TODO GGG HS we lose the ability to check the index of the first result, so just best-guessing it here.
|
||||
Optional<PredicateFinalStep> expansionStepOpt = searchProps.getExpansionStepOpt();
|
||||
int delta = 0;
|
||||
for (TermConcept concept : termConcepts) {
|
||||
count.incrementAndGet();
|
||||
countForBatch.incrementAndGet();
|
||||
count++;
|
||||
countForBatch++;
|
||||
if (theAdd && expansionStepOpt.isPresent()) {
|
||||
ValueSet.ConceptReferenceComponent theIncludeConcept = getMatchedConceptIncludedInValueSet(theIncludeOrExclude, concept);
|
||||
if (theIncludeConcept != null && isNotBlank(theIncludeConcept.getDisplay())) {
|
||||
|
@ -1050,17 +966,124 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
}
|
||||
}
|
||||
|
||||
ourLog.debug("Batch expansion for {} with starting index of {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), firstResult, countForBatch, swForBatch.getMillis());
|
||||
theValueSetCodeAccumulator.incrementOrDecrementTotalConcepts(theAdd, delta);
|
||||
ourLog.debug("Batch expansion scroll for {} with offset {} produced {} results in {}ms",
|
||||
(theAdd ? "inclusion" : "exclusion"), accumulatedBatchesSoFar, chunk.hits().size(), chunk.took().toMillis());
|
||||
|
||||
if (resultsInBatch < maxResultsPerBatch) {
|
||||
ourLog.debug("Expansion for {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), count, sw.getMillis());
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
theValueSetCodeAccumulator.incrementOrDecrementTotalConcepts(theAdd, delta);
|
||||
accumulatedBatchesSoFar += countForBatch;
|
||||
|
||||
// keep session bounded
|
||||
myEntityManager.flush();
|
||||
myEntityManager.clear();
|
||||
}
|
||||
|
||||
ourLog.debug("Expansion for {} produced {} results in {}ms",
|
||||
(theAdd ? "inclusion" : "exclusion"), count, fullOperationSw.getMillis());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private List<TermConcept> sortTermConcepts(SearchProperties searchProps, List<TermConcept> termConcepts) {
|
||||
List<String> codes = searchProps.getIncludeOrExcludeCodes();
|
||||
if (codes.size() > 1) {
|
||||
termConcepts = new ArrayList<>(termConcepts);
|
||||
Map<String, Integer> codeToIndex = new HashMap<>(codes.size());
|
||||
for (int i = 0; i < codes.size(); i++) {
|
||||
codeToIndex.put(codes.get(i), i);
|
||||
}
|
||||
termConcepts.sort(((o1, o2) -> {
|
||||
Integer idx1 = codeToIndex.get(o1.getCode());
|
||||
Integer idx2 = codeToIndex.get(o2.getCode());
|
||||
return Comparators.nullsHigh().compare(idx1, idx2);
|
||||
}));
|
||||
}
|
||||
return termConcepts;
|
||||
}
|
||||
|
||||
|
||||
private Optional<Integer> getScrollChunkSize(boolean theAdd, IValueSetConceptAccumulator theValueSetCodeAccumulator) {
|
||||
int maxResultsPerBatch = SearchBuilder.getMaximumPageSize();
|
||||
|
||||
/*
|
||||
* If the accumulator is bounded, we may reduce the size of the query to
|
||||
* Lucene in order to be more efficient.
|
||||
*/
|
||||
if (theAdd) {
|
||||
Integer accumulatorCapacityRemaining = theValueSetCodeAccumulator.getCapacityRemaining();
|
||||
if (accumulatorCapacityRemaining != null) {
|
||||
maxResultsPerBatch = Math.min(maxResultsPerBatch, accumulatorCapacityRemaining + 1);
|
||||
}
|
||||
}
|
||||
return maxResultsPerBatch > 0 ? Optional.of(maxResultsPerBatch): Optional.empty();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
private SearchProperties buildSearchScroll(TermCodeSystemVersion theTermCodeSystemVersion,
|
||||
ExpansionFilter theExpansionFilter,
|
||||
String theSystem,
|
||||
ValueSet.ConceptSetComponent theIncludeOrExclude,
|
||||
Integer theScrollChunkSize, String theIncludeOrExcludeVersion) {
|
||||
SearchSession searchSession = Search.session(myEntityManager);
|
||||
//Manually building a predicate since we need to throw it around.
|
||||
SearchPredicateFactory predicate = searchSession.scope(TermConcept.class).predicate();
|
||||
|
||||
//Build the top-level expansion on filters.
|
||||
PredicateFinalStep step = predicate.bool(b -> {
|
||||
b.must(predicate.match().field("myCodeSystemVersionPid").matching(theTermCodeSystemVersion.getPid()));
|
||||
|
||||
if (theExpansionFilter.hasCode()) {
|
||||
b.must(predicate.match().field("myCode").matching(theExpansionFilter.getCode()));
|
||||
}
|
||||
|
||||
String codeSystemUrlAndVersion = buildCodeSystemUrlAndVersion(theSystem, theIncludeOrExcludeVersion);
|
||||
for (ValueSet.ConceptSetFilterComponent nextFilter : theIncludeOrExclude.getFilter()) {
|
||||
handleFilter(codeSystemUrlAndVersion, predicate, b, nextFilter);
|
||||
}
|
||||
for (ValueSet.ConceptSetFilterComponent nextFilter : theExpansionFilter.getFilters()) {
|
||||
handleFilter(codeSystemUrlAndVersion, predicate, b, nextFilter);
|
||||
}
|
||||
});
|
||||
|
||||
SearchProperties returnProps = new SearchProperties();
|
||||
|
||||
List<String> codes = theIncludeOrExclude
|
||||
.getConcept()
|
||||
.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(ValueSet.ConceptReferenceComponent::getCode)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.collect(Collectors.toList());
|
||||
returnProps.setIncludeOrExcludeCodes(codes);
|
||||
|
||||
Optional<PredicateFinalStep> expansionStepOpt = buildExpansionPredicate(codes, predicate);
|
||||
final PredicateFinalStep finishedQuery = expansionStepOpt.isPresent()
|
||||
? predicate.bool().must(step).must(expansionStepOpt.get()) : step;
|
||||
returnProps.setExpansionStepOpt(expansionStepOpt);
|
||||
|
||||
/*
|
||||
* DM 2019-08-21 - Processing slows after any ValueSets with many codes explicitly identified. This might
|
||||
* be due to the dark arts that is memory management. Will monitor but not do anything about this right now.
|
||||
*/
|
||||
|
||||
//BooleanQuery.setMaxClauseCount(SearchBuilder.getMaximumPageSize());
|
||||
//TODO GGG HS looks like we can't set max clause count, but it can be set server side.
|
||||
//BooleanQuery.setMaxClauseCount(10000);
|
||||
// JM 22-02-15 - Hopefully increasing maxClauseCount should be not needed anymore
|
||||
|
||||
SearchQuery<EntityReference> termConceptsQuery = searchSession
|
||||
.search(TermConcept.class)
|
||||
.selectEntityReference()
|
||||
.where(f -> finishedQuery)
|
||||
.toQuery();
|
||||
|
||||
returnProps.setSearchScroll( termConceptsQuery.scroll(theScrollChunkSize) );
|
||||
return returnProps;
|
||||
}
|
||||
|
||||
|
||||
private ValueSet.ConceptReferenceComponent getMatchedConceptIncludedInValueSet(ValueSet.ConceptSetComponent theIncludeOrExclude, TermConcept concept) {
|
||||
return theIncludeOrExclude
|
||||
.getConcept()
|
||||
|
@ -2866,4 +2889,36 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Properties returned from method buildSearchScroll
|
||||
*/
|
||||
private final class SearchProperties {
|
||||
private SearchScroll<EntityReference> mySearchScroll;
|
||||
private Optional<PredicateFinalStep> myExpansionStepOpt;
|
||||
private List<String> myIncludeOrExcludeCodes;
|
||||
|
||||
public SearchScroll<EntityReference> getSearchScroll() {
|
||||
return mySearchScroll;
|
||||
}
|
||||
|
||||
public void setSearchScroll(SearchScroll<EntityReference> theSearchScroll) {
|
||||
mySearchScroll = theSearchScroll;
|
||||
}
|
||||
|
||||
public Optional<PredicateFinalStep> getExpansionStepOpt() {
|
||||
return myExpansionStepOpt;
|
||||
}
|
||||
|
||||
public void setExpansionStepOpt(Optional<PredicateFinalStep> theExpansionStepOpt) {
|
||||
myExpansionStepOpt = theExpansionStepOpt;
|
||||
}
|
||||
|
||||
public List<String> getIncludeOrExcludeCodes() {
|
||||
return myIncludeOrExcludeCodes;
|
||||
}
|
||||
|
||||
public void setIncludeOrExcludeCodes(List<String> theIncludeOrExcludeCodes) {
|
||||
myIncludeOrExcludeCodes = theIncludeOrExcludeCodes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,10 @@ import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
|
|||
import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion;
|
||||
import ca.uhn.fhir.jpa.entity.TermConcept;
|
||||
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
|
||||
import ca.uhn.fhir.jpa.entity.TermValueSet;
|
||||
import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
|
||||
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
|
||||
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
|
||||
|
@ -29,22 +32,33 @@ import org.hl7.fhir.instance.model.api.IIdType;
|
|||
import org.hl7.fhir.r4.model.CodeSystem;
|
||||
import org.hl7.fhir.r4.model.CodeableConcept;
|
||||
import org.hl7.fhir.r4.model.Coding;
|
||||
import org.hl7.fhir.r4.model.Enumerations;
|
||||
import org.hl7.fhir.r4.model.ValueSet;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Slice;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
|
||||
import static ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc.MAKE_LOADING_VERSION_CURRENT;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hl7.fhir.common.hapi.validation.support.ValidationConstants.LOINC_ALL_VALUESET_ID;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.anyCollection;
|
||||
|
@ -212,6 +226,63 @@ public class ValueSetExpansionR4ElasticsearchIT extends BaseJpaTest {
|
|||
verify(myValueSetCodeAccumulator, times(9)).includeConceptWithDesignations(anyString(), anyString(), nullable(String.class), anyCollection(), nullable(Long.class), nullable(String.class), nullable(String.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Reproduced: https://github.com/hapifhir/hapi-fhir/issues/3419
|
||||
*/
|
||||
@Test
|
||||
public void testExpandValueSetLargerThanElasticDefaultScrollSize() {
|
||||
CodeSystem codeSystem = new CodeSystem();
|
||||
codeSystem.setUrl(CS_URL);
|
||||
codeSystem.setContent(CodeSystem.CodeSystemContentMode.NOTPRESENT);
|
||||
codeSystem.setName("SYSTEM NAME");
|
||||
codeSystem.setVersion("SYSTEM VERSION");
|
||||
IIdType id = myCodeSystemDao.create(codeSystem, mySrd).getId().toUnqualified();
|
||||
ResourceTable csResource = myResourceTableDao.findById(id.getIdPartAsLong()).orElseThrow(IllegalArgumentException::new);
|
||||
|
||||
TermCodeSystemVersion codeSystemVersion = new TermCodeSystemVersion();
|
||||
codeSystemVersion.setResource(csResource);
|
||||
|
||||
// need to be more than elastic [index.max_result_window] index level setting (default = 10_000)
|
||||
addTermConcepts(codeSystemVersion, 11_000);
|
||||
|
||||
ValueSet valueSet = getValueSetWithAllCodeSystemConcepts( codeSystemVersion.getCodeSystemVersionId() );
|
||||
|
||||
myTermCodeSystemStorageSvc.storeNewCodeSystemVersion(codeSystem, codeSystemVersion,
|
||||
new SystemRequestDetails(), Collections.singletonList(valueSet), Collections.emptyList());
|
||||
|
||||
myTerminologyDeferredStorageSvc.saveAllDeferred();
|
||||
await().atMost(10, SECONDS).until( myTerminologyDeferredStorageSvc::isStorageQueueEmpty );
|
||||
|
||||
myTermSvc.preExpandDeferredValueSetsToTerminologyTables();
|
||||
|
||||
// exception is swallowed in pre-expansion process, so let's check the ValueSet was successfully expanded
|
||||
Slice<TermValueSet> page = runInTransaction(() ->
|
||||
myTermValueSetDao.findByExpansionStatus(PageRequest.of(0, 1), TermValueSetPreExpansionStatusEnum.EXPANDED));
|
||||
assertEquals(1, page.getContent().size());
|
||||
}
|
||||
|
||||
|
||||
|
||||
private ValueSet getValueSetWithAllCodeSystemConcepts(String theCodeSystemVersionId) {
|
||||
ValueSet vs = new ValueSet();
|
||||
vs.setId(LOINC_ALL_VALUESET_ID);
|
||||
vs.setUrl(CS_URL + "/vs");
|
||||
vs.setVersion(theCodeSystemVersionId);
|
||||
vs.setName("All LOINC codes");
|
||||
vs.setStatus(Enumerations.PublicationStatus.ACTIVE);
|
||||
vs.setDate(new Date());
|
||||
vs.setDescription("A value set that includes all LOINC codes");
|
||||
vs.getCompose().addInclude().setSystem(CS_URL).setVersion(theCodeSystemVersionId);
|
||||
return vs;
|
||||
}
|
||||
|
||||
|
||||
private void addTermConcepts(TermCodeSystemVersion theCs, int theTermConceptQty) {
|
||||
for (int i = 0; i < theTermConceptQty; i++) {
|
||||
TermConcept tc = new TermConcept(theCs, String.format("code-%05d", i));
|
||||
theCs.getConcepts().add(tc);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FhirContext getFhirContext() {
|
||||
|
|
|
@ -1073,7 +1073,7 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv
|
|||
.execute();
|
||||
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(expansion));
|
||||
assertThat(toDirectCodes(expansion.getExpansion().getContains()), containsInAnyOrder("A", "AA", "AB", "AAA"));
|
||||
assertEquals(11, myCaptureQueriesListener.getSelectQueries().size());
|
||||
assertEquals(12, myCaptureQueriesListener.getSelectQueries().size());
|
||||
assertEquals("ValueSet \"ValueSet.url[http://example.com/my_value_set]\" has not yet been pre-expanded. Performing in-memory expansion without parameters. Current status: NOT_EXPANDED | The ValueSet is waiting to be picked up and pre-expanded by a scheduled task.", expansion.getMeta().getExtensionString(EXT_VALUESET_EXPANSION_MESSAGE));
|
||||
|
||||
// Hierarchical
|
||||
|
@ -1090,7 +1090,7 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv
|
|||
assertThat(toDirectCodes(expansion.getExpansion().getContains()), containsInAnyOrder("A"));
|
||||
assertThat(toDirectCodes(expansion.getExpansion().getContains().get(0).getContains()), containsInAnyOrder("AA", "AB"));
|
||||
assertThat(toDirectCodes(expansion.getExpansion().getContains().get(0).getContains().stream().filter(t -> t.getCode().equals("AA")).findFirst().orElseThrow(() -> new IllegalArgumentException()).getContains()), containsInAnyOrder("AAA"));
|
||||
assertEquals(12, myCaptureQueriesListener.getSelectQueries().size());
|
||||
assertEquals(13, myCaptureQueriesListener.getSelectQueries().size());
|
||||
|
||||
}
|
||||
|
||||
|
@ -1113,7 +1113,7 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv
|
|||
.execute();
|
||||
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(expansion));
|
||||
assertThat(toDirectCodes(expansion.getExpansion().getContains()), containsInAnyOrder("A", "AA", "AB", "AAA"));
|
||||
assertEquals(7, myCaptureQueriesListener.getSelectQueries().size());
|
||||
assertEquals(8, myCaptureQueriesListener.getSelectQueries().size());
|
||||
assertEquals("ValueSet with URL \"Unidentified ValueSet\" was expanded using an in-memory expansion", expansion.getMeta().getExtensionString(EXT_VALUESET_EXPANSION_MESSAGE));
|
||||
|
||||
// Hierarchical
|
||||
|
@ -1130,7 +1130,7 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv
|
|||
assertThat(toDirectCodes(expansion.getExpansion().getContains()), containsInAnyOrder("A"));
|
||||
assertThat(toDirectCodes(expansion.getExpansion().getContains().get(0).getContains()), containsInAnyOrder("AA", "AB"));
|
||||
assertThat(toDirectCodes(expansion.getExpansion().getContains().get(0).getContains().stream().filter(t -> t.getCode().equals("AA")).findFirst().orElseThrow(() -> new IllegalArgumentException()).getContains()), containsInAnyOrder("AAA"));
|
||||
assertEquals(10, myCaptureQueriesListener.getSelectQueries().size());
|
||||
assertEquals(11, myCaptureQueriesListener.getSelectQueries().size());
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue