From 002a7dc670b38c09300e3f3d6e7040747820efee Mon Sep 17 00:00:00 2001 From: jmarchionatto <60409882+jmarchionatto@users.noreply.github.com> Date: Wed, 7 Sep 2022 15:52:38 -0400 Subject: [PATCH] Issue 3419 elastic search exception while pre expanding valueset with more than 10 000 concepts (#3991) * Add test reproducing bug * Refactor ValueSet expansion hsearch queries to use scroll. * Use void method for never used return value. * Execute action when transaction is active * Update tests query counts as the CodeSystemVersion.pid is always queried now * Enable test after reproduced bug fix. Add changelog. * Implement suggestions Co-authored-by: juan.marchionatto --- ...alueset-with-more-than-10000-concepts.yaml | 5 + .../fhir/jpa/term/BaseTermReadSvcImpl.java | 369 ++++++++++-------- .../ValueSetExpansionR4ElasticsearchIT.java | 71 ++++ ...rceProviderR4ValueSetNoVerCSNoVerTest.java | 8 +- 4 files changed, 292 insertions(+), 161 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/3419-elasticsearch-exception-expanding-valueset-with-more-than-10000-concepts.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/3419-elasticsearch-exception-expanding-valueset-with-more-than-10000-concepts.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/3419-elasticsearch-exception-expanding-valueset-with-more-than-10000-concepts.yaml new file mode 100644 index 00000000000..3fb30cc0285 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_2_0/3419-elasticsearch-exception-expanding-valueset-with-more-than-10000-concepts.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 3419 +title: "With Elasticsearch configured, including terminology, an exception was raised while expanding a ValueSet + with more than 10,000 concepts. This has now been fixed." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java index e7c7eedcd77..9df3c53c44e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseTermReadSvcImpl.java @@ -108,6 +108,8 @@ import org.hibernate.search.engine.search.predicate.dsl.BooleanPredicateClausesS import org.hibernate.search.engine.search.predicate.dsl.PredicateFinalStep; import org.hibernate.search.engine.search.predicate.dsl.SearchPredicateFactory; import org.hibernate.search.engine.search.query.SearchQuery; +import org.hibernate.search.engine.search.query.SearchScroll; +import org.hibernate.search.engine.search.query.SearchScrollResult; import org.hibernate.search.mapper.orm.Search; import org.hibernate.search.mapper.orm.common.EntityReference; import org.hibernate.search.mapper.orm.session.SearchSession; @@ -186,9 +188,7 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Collectors; import static ca.uhn.fhir.jpa.term.api.ITermLoaderSvc.LOINC_URI; @@ -759,32 +759,17 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { // Handle includes ourLog.debug("Handling includes"); for (ValueSet.ConceptSetComponent include : theValueSetToExpand.getCompose().getInclude()) { - for (int i = 0; ; i++) { - int queryIndex = i; - Boolean shouldContinue = executeInNewTransactionIfNeeded(() -> { - boolean add = true; - return expandValueSetHandleIncludeOrExclude(theExpansionOptions, theValueSetCodeAccumulator, addedCodes, include, add, queryIndex, theExpansionFilter); - }); - if (!shouldContinue) { - break; - } - } + myTxTemplate.executeWithoutResult(tx -> + expandValueSetHandleIncludeOrExclude(theExpansionOptions, theValueSetCodeAccumulator, addedCodes, + include, true, theExpansionFilter) ); } // Handle excludes ourLog.debug("Handling excludes"); for (ValueSet.ConceptSetComponent exclude : theValueSetToExpand.getCompose().getExclude()) { - for (int i = 0; ; i++) { - int queryIndex = i; - Boolean shouldContinue = executeInNewTransactionIfNeeded(() -> { - boolean add = false; - ExpansionFilter expansionFilter = ExpansionFilter.NO_FILTER; - return expandValueSetHandleIncludeOrExclude(theExpansionOptions, theValueSetCodeAccumulator, addedCodes, exclude, add, queryIndex, expansionFilter); - }); - if (!shouldContinue) { - break; - } - } + myTxTemplate.executeWithoutResult(tx -> + expandValueSetHandleIncludeOrExclude(theExpansionOptions, theValueSetCodeAccumulator, addedCodes, + exclude, false, ExpansionFilter.NO_FILTER) ); } if (theValueSetCodeAccumulator instanceof ValueSetConceptAccumulator) { @@ -794,17 +779,6 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { ourLog.debug("Done working with {} in {}ms", valueSetInfo, sw.getMillis()); } - /** - * Execute in a new transaction only if we aren't already in one. We do this because in some cases - * when performing a VS expansion we throw an {@link ExpansionTooCostlyException} and we don't want - * this to cause the TX to be marked a rollback prematurely. - */ - private T executeInNewTransactionIfNeeded(Supplier theAction) { - if (TransactionSynchronizationManager.isSynchronizationActive()) { - return theAction.get(); - } - return myTxTemplate.execute(t -> theAction.get()); - } private String getValueSetInfo(ValueSet theValueSet) { StringBuilder sb = new StringBuilder(); @@ -833,7 +807,12 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { /** * @return Returns true if there are potentially more results to process. */ - private Boolean expandValueSetHandleIncludeOrExclude(@Nullable ValueSetExpansionOptions theExpansionOptions, IValueSetConceptAccumulator theValueSetCodeAccumulator, Set theAddedCodes, ValueSet.ConceptSetComponent theIncludeOrExclude, boolean theAdd, int theQueryIndex, @Nonnull ExpansionFilter theExpansionFilter) { + private void expandValueSetHandleIncludeOrExclude(@Nullable ValueSetExpansionOptions theExpansionOptions, + IValueSetConceptAccumulator theValueSetCodeAccumulator, + Set theAddedCodes, + ValueSet.ConceptSetComponent theIncludeOrExclude, + boolean theAdd, + @Nonnull ExpansionFilter theExpansionFilter) { String system = theIncludeOrExclude.getSystem(); boolean hasSystem = isNotBlank(system); @@ -842,7 +821,7 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { if (hasSystem) { if (theExpansionFilter.hasCode() && theExpansionFilter.getSystem() != null && !system.equals(theExpansionFilter.getSystem())) { - return false; + return; } ourLog.debug("Starting {} expansion around CodeSystem: {}", (theAdd ? "inclusion" : "exclusion"), system); @@ -850,14 +829,16 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { TermCodeSystem cs = myCodeSystemDao.findByCodeSystemUri(system); if (cs != null) { - return expandValueSetHandleIncludeOrExcludeUsingDatabase(theExpansionOptions, theValueSetCodeAccumulator, theAddedCodes, theIncludeOrExclude, theAdd, theQueryIndex, theExpansionFilter, system, cs); + expandValueSetHandleIncludeOrExcludeUsingDatabase(theExpansionOptions, theValueSetCodeAccumulator, + theAddedCodes, theIncludeOrExclude, theAdd, theExpansionFilter, system, cs); + return; } else { if (theIncludeOrExclude.getConcept().size() > 0 && theExpansionFilter.hasCode()) { if (defaultString(theIncludeOrExclude.getSystem()).equals(theExpansionFilter.getSystem())) { if (theIncludeOrExclude.getConcept().stream().noneMatch(t -> t.getCode().equals(theExpansionFilter.getCode()))) { - return false; + return; } } } @@ -872,14 +853,12 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { new InMemoryTerminologyServerValidationSupport(myContext).expandValueSetIncludeOrExclude(new ValidationSupportContext(provideValidationSupport()), consumer, includeOrExclude); } catch (InMemoryTerminologyServerValidationSupport.ExpansionCouldNotBeCompletedInternallyException e) { if (!theExpansionOptions.isFailOnMissingCodeSystem() && e.getFailureType() == InMemoryTerminologyServerValidationSupport.FailureType.UNKNOWN_CODE_SYSTEM) { - return false; + return; } throw new InternalErrorException(Msg.code(888) + e); } finally { ConversionContext40_50.INSTANCE.close("ValueSet"); } - - return false; } } else if (hasValueSet) { @@ -901,8 +880,6 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { } - return false; - } else { throw new InvalidRequestException(Msg.code(890) + "ValueSet contains " + (theAdd ? "include" : "exclude") + " criteria with no system defined"); } @@ -915,110 +892,99 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { } @Nonnull - private Boolean expandValueSetHandleIncludeOrExcludeUsingDatabase(ValueSetExpansionOptions theExpansionOptions, IValueSetConceptAccumulator theValueSetCodeAccumulator, Set theAddedCodes, ValueSet.ConceptSetComponent theIncludeOrExclude, boolean theAdd, int theQueryIndex, @Nonnull ExpansionFilter theExpansionFilter, String theSystem, TermCodeSystem theCs) { - String includeOrExcludeVersion = theIncludeOrExclude.getVersion(); - TermCodeSystemVersion csv; - if (isEmpty(includeOrExcludeVersion)) { - csv = theCs.getCurrentVersion(); - } else { - csv = myCodeSystemVersionDao.findByCodeSystemPidAndVersion(theCs.getPid(), includeOrExcludeVersion); - } + private void expandValueSetHandleIncludeOrExcludeUsingDatabase( + ValueSetExpansionOptions theExpansionOptions, + IValueSetConceptAccumulator theValueSetCodeAccumulator, + Set theAddedCodes, + ValueSet.ConceptSetComponent theIncludeOrExclude, + boolean theAdd, + @Nonnull ExpansionFilter theExpansionFilter, + String theSystem, + TermCodeSystem theCs) { + + StopWatch fullOperationSw = new StopWatch(); + + String includeOrExcludeVersion = theIncludeOrExclude.getVersion(); + TermCodeSystemVersion termCodeSystemVersion = isEmpty(includeOrExcludeVersion) + ? theCs.getCurrentVersion() + : myCodeSystemVersionDao.findByCodeSystemPidAndVersion(theCs.getPid(), includeOrExcludeVersion); - SearchSession searchSession = Search.session(myEntityManager); /* * If FullText searching is not enabled, we can handle only basic expansions * since we're going to do it without the database. */ if (!isHibernateSearchEnabled()) { - expandWithoutHibernateSearch(theValueSetCodeAccumulator, csv, theAddedCodes, theIncludeOrExclude, theSystem, theAdd); - return false; + expandWithoutHibernateSearch(theValueSetCodeAccumulator, termCodeSystemVersion, theAddedCodes, theIncludeOrExclude, theSystem, theAdd); + return; } /* * Ok, let's use hibernate search to build the expansion */ - //Manually building a predicate since we need to throw it around. - SearchPredicateFactory predicate = searchSession.scope(TermConcept.class).predicate(); - //Build the top-level expansion on filters. - PredicateFinalStep step = predicate.bool(b -> { - b.must(predicate.match().field("myCodeSystemVersionPid").matching(csv.getPid())); + int count = 0; - if (theExpansionFilter.hasCode()) { - b.must(predicate.match().field("myCode").matching(theExpansionFilter.getCode())); + Optional chunkSizeOpt = getScrollChunkSize(theAdd, theValueSetCodeAccumulator); + if (chunkSizeOpt.isEmpty()) { return; } + int chunkSize = chunkSizeOpt.get(); + + SearchProperties searchProps = buildSearchScroll(termCodeSystemVersion, theExpansionFilter, theSystem, + theIncludeOrExclude, chunkSize, includeOrExcludeVersion); + + int accumulatedBatchesSoFar = 0; + try ( SearchScroll scroll = searchProps.getSearchScroll() ) { + + ourLog.debug("Beginning batch expansion for {} with max results per batch: {}", (theAdd ? "inclusion" : "exclusion"), chunkSize); + for ( SearchScrollResult chunk = scroll.next(); chunk.hasHits(); chunk = scroll.next() ) { + int countForBatch = 0; + + List pids = chunk.hits() + .stream() + .map(t -> (Long) t.id()) + .collect(Collectors.toList()); + + List termConcepts = myTermConceptDao.fetchConceptsAndDesignationsByPid(pids); + + // If the include section had multiple codes, return the codes in the same order + termConcepts = sortTermConcepts(searchProps, termConcepts); + + // int firstResult = theQueryIndex * maxResultsPerBatch;// TODO GGG HS we lose the ability to check the index of the first result, so just best-guessing it here. + Optional expansionStepOpt = searchProps.getExpansionStepOpt(); + int delta = 0; + for (TermConcept concept : termConcepts) { + count++; + countForBatch++; + if (theAdd && expansionStepOpt.isPresent()) { + ValueSet.ConceptReferenceComponent theIncludeConcept = getMatchedConceptIncludedInValueSet(theIncludeOrExclude, concept); + if (theIncludeConcept != null && isNotBlank(theIncludeConcept.getDisplay())) { + concept.setDisplay(theIncludeConcept.getDisplay()); + } + } + boolean added = addCodeIfNotAlreadyAdded(theExpansionOptions, theValueSetCodeAccumulator, theAddedCodes, concept, theAdd, includeOrExcludeVersion); + if (added) { + delta++; + } + } + + ourLog.debug("Batch expansion scroll for {} with offset {} produced {} results in {}ms", + (theAdd ? "inclusion" : "exclusion"), accumulatedBatchesSoFar, chunk.hits().size(), chunk.took().toMillis()); + + theValueSetCodeAccumulator.incrementOrDecrementTotalConcepts(theAdd, delta); + accumulatedBatchesSoFar += countForBatch; + + // keep session bounded + myEntityManager.flush(); + myEntityManager.clear(); } - String codeSystemUrlAndVersion = buildCodeSystemUrlAndVersion(theSystem, includeOrExcludeVersion); - for (ValueSet.ConceptSetFilterComponent nextFilter : theIncludeOrExclude.getFilter()) { - handleFilter(codeSystemUrlAndVersion, predicate, b, nextFilter); - } - for (ValueSet.ConceptSetFilterComponent nextFilter : theExpansionFilter.getFilters()) { - handleFilter(codeSystemUrlAndVersion, predicate, b, nextFilter); - } - }); - - List codes = theIncludeOrExclude - .getConcept() - .stream() - .filter(Objects::nonNull) - .map(ValueSet.ConceptReferenceComponent::getCode) - .filter(StringUtils::isNotBlank) - .collect(Collectors.toList()); - - Optional expansionStepOpt = buildExpansionPredicate(codes, predicate); - final PredicateFinalStep finishedQuery = expansionStepOpt.isPresent() - ? predicate.bool().must(step).must(expansionStepOpt.get()) : step; - - /* - * DM 2019-08-21 - Processing slows after any ValueSets with many codes explicitly identified. This might - * be due to the dark arts that is memory management. Will monitor but not do anything about this right now. - */ - - //BooleanQuery.setMaxClauseCount(SearchBuilder.getMaximumPageSize()); - //TODO GGG HS looks like we can't set max clause count, but it can be set server side. - //BooleanQuery.setMaxClauseCount(10000); - // JM 2-22-02-15 - Hopefully increasing maxClauseCount should be not needed anymore - - StopWatch sw = new StopWatch(); - AtomicInteger count = new AtomicInteger(0); - - int maxResultsPerBatch = SearchBuilder.getMaximumPageSize(); - - /* - * If the accumulator is bounded, we may reduce the size of the query to - * Lucene in order to be more efficient. - */ - if (theAdd) { - Integer accumulatorCapacityRemaining = theValueSetCodeAccumulator.getCapacityRemaining(); - if (accumulatorCapacityRemaining != null) { - maxResultsPerBatch = Math.min(maxResultsPerBatch, accumulatorCapacityRemaining + 1); - } - if (maxResultsPerBatch <= 0) { - return false; - } + ourLog.debug("Expansion for {} produced {} results in {}ms", + (theAdd ? "inclusion" : "exclusion"), count, fullOperationSw.getMillis()); } + } - ourLog.debug("Beginning batch expansion for {} with max results per batch: {}", (theAdd ? "inclusion" : "exclusion"), maxResultsPerBatch); - StopWatch swForBatch = new StopWatch(); - AtomicInteger countForBatch = new AtomicInteger(0); - - SearchQuery termConceptsQuery = searchSession - .search(TermConcept.class) - .selectEntityReference() - .where(f -> finishedQuery) - .toQuery(); - - ourLog.trace("About to query: {}", termConceptsQuery.queryString()); - List termConceptRefs = termConceptsQuery.fetchHits(theQueryIndex * maxResultsPerBatch, maxResultsPerBatch); - List pids = termConceptRefs - .stream() - .map(t -> (Long) t.id()) - .collect(Collectors.toList()); - - List termConcepts = myTermConceptDao.fetchConceptsAndDesignationsByPid(pids); - - // If the include section had multiple codes, return the codes in the same order + private List sortTermConcepts(SearchProperties searchProps, List termConcepts) { + List codes = searchProps.getIncludeOrExcludeCodes(); if (codes.size() > 1) { termConcepts = new ArrayList<>(termConcepts); Map codeToIndex = new HashMap<>(codes.size()); @@ -1031,36 +997,93 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { return Comparators.nullsHigh().compare(idx1, idx2); })); } - - int resultsInBatch = termConcepts.size(); - int firstResult = theQueryIndex * maxResultsPerBatch;// TODO GGG HS we lose the ability to check the index of the first result, so just best-guessing it here. - int delta = 0; - for (TermConcept concept : termConcepts) { - count.incrementAndGet(); - countForBatch.incrementAndGet(); - if (theAdd && expansionStepOpt.isPresent()) { - ValueSet.ConceptReferenceComponent theIncludeConcept = getMatchedConceptIncludedInValueSet(theIncludeOrExclude, concept); - if (theIncludeConcept != null && isNotBlank(theIncludeConcept.getDisplay())) { - concept.setDisplay(theIncludeConcept.getDisplay()); - } - } - boolean added = addCodeIfNotAlreadyAdded(theExpansionOptions, theValueSetCodeAccumulator, theAddedCodes, concept, theAdd, includeOrExcludeVersion); - if (added) { - delta++; - } - } - - ourLog.debug("Batch expansion for {} with starting index of {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), firstResult, countForBatch, swForBatch.getMillis()); - theValueSetCodeAccumulator.incrementOrDecrementTotalConcepts(theAdd, delta); - - if (resultsInBatch < maxResultsPerBatch) { - ourLog.debug("Expansion for {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), count, sw.getMillis()); - return false; - } else { - return true; - } + return termConcepts; } + + private Optional getScrollChunkSize(boolean theAdd, IValueSetConceptAccumulator theValueSetCodeAccumulator) { + int maxResultsPerBatch = SearchBuilder.getMaximumPageSize(); + + /* + * If the accumulator is bounded, we may reduce the size of the query to + * Lucene in order to be more efficient. + */ + if (theAdd) { + Integer accumulatorCapacityRemaining = theValueSetCodeAccumulator.getCapacityRemaining(); + if (accumulatorCapacityRemaining != null) { + maxResultsPerBatch = Math.min(maxResultsPerBatch, accumulatorCapacityRemaining + 1); + } + } + return maxResultsPerBatch > 0 ? Optional.of(maxResultsPerBatch): Optional.empty(); + } + + + + + + private SearchProperties buildSearchScroll(TermCodeSystemVersion theTermCodeSystemVersion, + ExpansionFilter theExpansionFilter, + String theSystem, + ValueSet.ConceptSetComponent theIncludeOrExclude, + Integer theScrollChunkSize, String theIncludeOrExcludeVersion) { + SearchSession searchSession = Search.session(myEntityManager); + //Manually building a predicate since we need to throw it around. + SearchPredicateFactory predicate = searchSession.scope(TermConcept.class).predicate(); + + //Build the top-level expansion on filters. + PredicateFinalStep step = predicate.bool(b -> { + b.must(predicate.match().field("myCodeSystemVersionPid").matching(theTermCodeSystemVersion.getPid())); + + if (theExpansionFilter.hasCode()) { + b.must(predicate.match().field("myCode").matching(theExpansionFilter.getCode())); + } + + String codeSystemUrlAndVersion = buildCodeSystemUrlAndVersion(theSystem, theIncludeOrExcludeVersion); + for (ValueSet.ConceptSetFilterComponent nextFilter : theIncludeOrExclude.getFilter()) { + handleFilter(codeSystemUrlAndVersion, predicate, b, nextFilter); + } + for (ValueSet.ConceptSetFilterComponent nextFilter : theExpansionFilter.getFilters()) { + handleFilter(codeSystemUrlAndVersion, predicate, b, nextFilter); + } + }); + + SearchProperties returnProps = new SearchProperties(); + + List codes = theIncludeOrExclude + .getConcept() + .stream() + .filter(Objects::nonNull) + .map(ValueSet.ConceptReferenceComponent::getCode) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toList()); + returnProps.setIncludeOrExcludeCodes(codes); + + Optional expansionStepOpt = buildExpansionPredicate(codes, predicate); + final PredicateFinalStep finishedQuery = expansionStepOpt.isPresent() + ? predicate.bool().must(step).must(expansionStepOpt.get()) : step; + returnProps.setExpansionStepOpt(expansionStepOpt); + + /* + * DM 2019-08-21 - Processing slows after any ValueSets with many codes explicitly identified. This might + * be due to the dark arts that is memory management. Will monitor but not do anything about this right now. + */ + + //BooleanQuery.setMaxClauseCount(SearchBuilder.getMaximumPageSize()); + //TODO GGG HS looks like we can't set max clause count, but it can be set server side. + //BooleanQuery.setMaxClauseCount(10000); + // JM 22-02-15 - Hopefully increasing maxClauseCount should be not needed anymore + + SearchQuery termConceptsQuery = searchSession + .search(TermConcept.class) + .selectEntityReference() + .where(f -> finishedQuery) + .toQuery(); + + returnProps.setSearchScroll( termConceptsQuery.scroll(theScrollChunkSize) ); + return returnProps; + } + + private ValueSet.ConceptReferenceComponent getMatchedConceptIncludedInValueSet(ValueSet.ConceptSetComponent theIncludeOrExclude, TermConcept concept) { return theIncludeOrExclude .getConcept() @@ -2866,4 +2889,36 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc { } } + /** + * Properties returned from method buildSearchScroll + */ + private final class SearchProperties { + private SearchScroll mySearchScroll; + private Optional myExpansionStepOpt; + private List myIncludeOrExcludeCodes; + + public SearchScroll getSearchScroll() { + return mySearchScroll; + } + + public void setSearchScroll(SearchScroll theSearchScroll) { + mySearchScroll = theSearchScroll; + } + + public Optional getExpansionStepOpt() { + return myExpansionStepOpt; + } + + public void setExpansionStepOpt(Optional theExpansionStepOpt) { + myExpansionStepOpt = theExpansionStepOpt; + } + + public List getIncludeOrExcludeCodes() { + return myIncludeOrExcludeCodes; + } + + public void setIncludeOrExcludeCodes(List theIncludeOrExcludeCodes) { + myIncludeOrExcludeCodes = theIncludeOrExcludeCodes; + } + } } diff --git a/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4ElasticsearchIT.java b/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4ElasticsearchIT.java index 7e93129a881..60b3412fd62 100644 --- a/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4ElasticsearchIT.java +++ b/hapi-fhir-jpaserver-elastic-test-utilities/src/test/java/ca/uhn/fhir/jpa/term/ValueSetExpansionR4ElasticsearchIT.java @@ -13,7 +13,10 @@ import ca.uhn.fhir.jpa.dao.data.IResourceTableDao; import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion; import ca.uhn.fhir.jpa.entity.TermConcept; import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink; +import ca.uhn.fhir.jpa.entity.TermValueSet; +import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum; import ca.uhn.fhir.jpa.model.entity.ResourceTable; +import ca.uhn.fhir.jpa.partition.SystemRequestDetails; import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc; import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc; import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc; @@ -29,22 +32,33 @@ import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.r4.model.CodeSystem; import org.hl7.fhir.r4.model.CodeableConcept; import org.hl7.fhir.r4.model.Coding; +import org.hl7.fhir.r4.model.Enumerations; import org.hl7.fhir.r4.model.ValueSet; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; import org.mockito.Mock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Slice; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.transaction.PlatformTransactionManager; +import java.util.Collections; +import java.util.Date; + import static ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc.MAKE_LOADING_VERSION_CURRENT; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hl7.fhir.common.hapi.validation.support.ValidationConstants.LOINC_ALL_VALUESET_ID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.anyCollection; @@ -212,6 +226,63 @@ public class ValueSetExpansionR4ElasticsearchIT extends BaseJpaTest { verify(myValueSetCodeAccumulator, times(9)).includeConceptWithDesignations(anyString(), anyString(), nullable(String.class), anyCollection(), nullable(Long.class), nullable(String.class), nullable(String.class)); } + /** + * Reproduced: https://github.com/hapifhir/hapi-fhir/issues/3419 + */ + @Test + public void testExpandValueSetLargerThanElasticDefaultScrollSize() { + CodeSystem codeSystem = new CodeSystem(); + codeSystem.setUrl(CS_URL); + codeSystem.setContent(CodeSystem.CodeSystemContentMode.NOTPRESENT); + codeSystem.setName("SYSTEM NAME"); + codeSystem.setVersion("SYSTEM VERSION"); + IIdType id = myCodeSystemDao.create(codeSystem, mySrd).getId().toUnqualified(); + ResourceTable csResource = myResourceTableDao.findById(id.getIdPartAsLong()).orElseThrow(IllegalArgumentException::new); + + TermCodeSystemVersion codeSystemVersion = new TermCodeSystemVersion(); + codeSystemVersion.setResource(csResource); + + // need to be more than elastic [index.max_result_window] index level setting (default = 10_000) + addTermConcepts(codeSystemVersion, 11_000); + + ValueSet valueSet = getValueSetWithAllCodeSystemConcepts( codeSystemVersion.getCodeSystemVersionId() ); + + myTermCodeSystemStorageSvc.storeNewCodeSystemVersion(codeSystem, codeSystemVersion, + new SystemRequestDetails(), Collections.singletonList(valueSet), Collections.emptyList()); + + myTerminologyDeferredStorageSvc.saveAllDeferred(); + await().atMost(10, SECONDS).until( myTerminologyDeferredStorageSvc::isStorageQueueEmpty ); + + myTermSvc.preExpandDeferredValueSetsToTerminologyTables(); + + // exception is swallowed in pre-expansion process, so let's check the ValueSet was successfully expanded + Slice page = runInTransaction(() -> + myTermValueSetDao.findByExpansionStatus(PageRequest.of(0, 1), TermValueSetPreExpansionStatusEnum.EXPANDED)); + assertEquals(1, page.getContent().size()); + } + + + + private ValueSet getValueSetWithAllCodeSystemConcepts(String theCodeSystemVersionId) { + ValueSet vs = new ValueSet(); + vs.setId(LOINC_ALL_VALUESET_ID); + vs.setUrl(CS_URL + "/vs"); + vs.setVersion(theCodeSystemVersionId); + vs.setName("All LOINC codes"); + vs.setStatus(Enumerations.PublicationStatus.ACTIVE); + vs.setDate(new Date()); + vs.setDescription("A value set that includes all LOINC codes"); + vs.getCompose().addInclude().setSystem(CS_URL).setVersion(theCodeSystemVersionId); + return vs; + } + + + private void addTermConcepts(TermCodeSystemVersion theCs, int theTermConceptQty) { + for (int i = 0; i < theTermConceptQty; i++) { + TermConcept tc = new TermConcept(theCs, String.format("code-%05d", i)); + theCs.getConcepts().add(tc); + } + } @Override protected FhirContext getFhirContext() { diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java index 3e7479ad725..c93d1b6b6ba 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/ResourceProviderR4ValueSetNoVerCSNoVerTest.java @@ -1073,7 +1073,7 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv .execute(); ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(expansion)); assertThat(toDirectCodes(expansion.getExpansion().getContains()), containsInAnyOrder("A", "AA", "AB", "AAA")); - assertEquals(11, myCaptureQueriesListener.getSelectQueries().size()); + assertEquals(12, myCaptureQueriesListener.getSelectQueries().size()); assertEquals("ValueSet \"ValueSet.url[http://example.com/my_value_set]\" has not yet been pre-expanded. Performing in-memory expansion without parameters. Current status: NOT_EXPANDED | The ValueSet is waiting to be picked up and pre-expanded by a scheduled task.", expansion.getMeta().getExtensionString(EXT_VALUESET_EXPANSION_MESSAGE)); // Hierarchical @@ -1090,7 +1090,7 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv assertThat(toDirectCodes(expansion.getExpansion().getContains()), containsInAnyOrder("A")); assertThat(toDirectCodes(expansion.getExpansion().getContains().get(0).getContains()), containsInAnyOrder("AA", "AB")); assertThat(toDirectCodes(expansion.getExpansion().getContains().get(0).getContains().stream().filter(t -> t.getCode().equals("AA")).findFirst().orElseThrow(() -> new IllegalArgumentException()).getContains()), containsInAnyOrder("AAA")); - assertEquals(12, myCaptureQueriesListener.getSelectQueries().size()); + assertEquals(13, myCaptureQueriesListener.getSelectQueries().size()); } @@ -1113,7 +1113,7 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv .execute(); ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(expansion)); assertThat(toDirectCodes(expansion.getExpansion().getContains()), containsInAnyOrder("A", "AA", "AB", "AAA")); - assertEquals(7, myCaptureQueriesListener.getSelectQueries().size()); + assertEquals(8, myCaptureQueriesListener.getSelectQueries().size()); assertEquals("ValueSet with URL \"Unidentified ValueSet\" was expanded using an in-memory expansion", expansion.getMeta().getExtensionString(EXT_VALUESET_EXPANSION_MESSAGE)); // Hierarchical @@ -1130,7 +1130,7 @@ public class ResourceProviderR4ValueSetNoVerCSNoVerTest extends BaseResourceProv assertThat(toDirectCodes(expansion.getExpansion().getContains()), containsInAnyOrder("A")); assertThat(toDirectCodes(expansion.getExpansion().getContains().get(0).getContains()), containsInAnyOrder("AA", "AB")); assertThat(toDirectCodes(expansion.getExpansion().getContains().get(0).getContains().stream().filter(t -> t.getCode().equals("AA")).findFirst().orElseThrow(() -> new IllegalArgumentException()).getContains()), containsInAnyOrder("AAA")); - assertEquals(10, myCaptureQueriesListener.getSelectQueries().size()); + assertEquals(11, myCaptureQueriesListener.getSelectQueries().size()); }