diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/TermValueSetPreExpansionStatusEnum.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/TermValueSetPreExpansionStatusEnum.java index d5158cfa86c..f8a2cb673df 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/TermValueSetPreExpansionStatusEnum.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/TermValueSetPreExpansionStatusEnum.java @@ -32,9 +32,11 @@ public enum TermValueSetPreExpansionStatusEnum { /** * Sorting agnostic. */ + // FIXME: add a unit test that verifies a message exists for each code NOT_EXPANDED("notExpanded"), EXPANSION_IN_PROGRESS("expansionInProgress"), - EXPANDED("expanded"); + EXPANDED("expanded"), + FAILED_TO_EXPAND("failedToExpand"); private static Map ourValues; private String myCode; diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseHapiTerminologySvcImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseHapiTerminologySvcImpl.java index db92f054233..f8ddae24886 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseHapiTerminologySvcImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseHapiTerminologySvcImpl.java @@ -618,6 +618,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, expandValueSet(theValueSetToExpand, theValueSetCodeAccumulator, new AtomicInteger(0)); } + @SuppressWarnings("ConstantConditions") private void expandValueSet(ValueSet theValueSetToExpand, IValueSetConceptAccumulator theValueSetCodeAccumulator, AtomicInteger theCodeCounter) { Set addedCodes = new HashSet<>(); @@ -625,16 +626,32 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, ourLog.debug("Handling includes"); for (ValueSet.ConceptSetComponent include : theValueSetToExpand.getCompose().getInclude()) { ourLog.info("Working with " + identifyValueSetForLogging(theValueSetToExpand)); - boolean add = true; - expandValueSetHandleIncludeOrExclude(theValueSetCodeAccumulator, addedCodes, include, add, theCodeCounter); + for (int i = 0; ; i++) { + int finalI = i; + boolean shouldContinue = myTxTemplate.execute(t -> { + boolean add = true; + return expandValueSetHandleIncludeOrExclude(theValueSetCodeAccumulator, addedCodes, include, add, theCodeCounter, finalI); + }); + if (!shouldContinue) { + break; + } + } } // Handle excludes ourLog.debug("Handling excludes"); for (ValueSet.ConceptSetComponent exclude : theValueSetToExpand.getCompose().getExclude()) { ourLog.info("Working with " + identifyValueSetForLogging(theValueSetToExpand)); - boolean add = false; - expandValueSetHandleIncludeOrExclude(theValueSetCodeAccumulator, addedCodes, exclude, add, theCodeCounter); + for (int i = 0; ; i++) { + int finalI = i; + boolean shouldContinue = myTxTemplate.execute(t -> { + boolean add = false; + return expandValueSetHandleIncludeOrExclude(theValueSetCodeAccumulator, addedCodes, exclude, add, theCodeCounter, finalI); + }); + if (!shouldContinue) { + break; + } + } } } @@ -685,7 +702,11 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, return retVal; } - private void expandValueSetHandleIncludeOrExclude(IValueSetConceptAccumulator theValueSetCodeAccumulator, Set theAddedCodes, ValueSet.ConceptSetComponent theInclude, boolean theAdd, AtomicInteger theCodeCounter) { + /** + * @return Returns true if there are potentially more results to return + */ + private boolean expandValueSetHandleIncludeOrExclude(IValueSetConceptAccumulator theValueSetCodeAccumulator, Set theAddedCodes, ValueSet.ConceptSetComponent theInclude, boolean theAdd, AtomicInteger theCodeCounter, int theQueryIndex) { + String system = theInclude.getSystem(); boolean hasSystem = isNotBlank(system); boolean hasValueSet = theInclude.getValueSet().size() > 0; @@ -695,6 +716,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, TermCodeSystem cs = myCodeSystemDao.findByCodeSystemUri(system); if (cs != null) { + TermCodeSystemVersion csv = cs.getCurrentVersion(); FullTextEntityManager em = org.hibernate.search.jpa.Search.getFullTextEntityManager(myEntityManager); @@ -704,7 +726,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, */ if (myFulltextSearchSvc == null) { expandWithoutHibernateSearch(theValueSetCodeAccumulator, theAddedCodes, theInclude, system, theAdd, theCodeCounter); - return; + return false; } /* @@ -829,34 +851,31 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, int maxResultsPerBatch = 10000; jpaQuery.setMaxResults(maxResultsPerBatch); - jpaQuery.setFirstResult(0); + jpaQuery.setFirstResult(theQueryIndex * maxResultsPerBatch); ourLog.info("Beginning batch expansion for {} with max results per batch: {}", (theAdd ? "inclusion" : "exclusion"), maxResultsPerBatch); - do { - StopWatch swForBatch = new StopWatch(); - AtomicInteger countForBatch = new AtomicInteger(0); + StopWatch swForBatch = new StopWatch(); + AtomicInteger countForBatch = new AtomicInteger(0); - List resultList = jpaQuery.getResultList(); - int resultsInBatch = jpaQuery.getResultSize(); - int firstResult = jpaQuery.getFirstResult(); - for (Object next : resultList) { - count.incrementAndGet(); - countForBatch.incrementAndGet(); - TermConcept concept = (TermConcept) next; - addCodeIfNotAlreadyAdded(theValueSetCodeAccumulator, theAddedCodes, concept, theAdd, theCodeCounter); - } + List resultList = jpaQuery.getResultList(); + int resultsInBatch = jpaQuery.getResultSize(); + int firstResult = jpaQuery.getFirstResult(); + for (Object next : resultList) { + count.incrementAndGet(); + countForBatch.incrementAndGet(); + TermConcept concept = (TermConcept) next; + addCodeIfNotAlreadyAdded(theValueSetCodeAccumulator, theAddedCodes, concept, theAdd, theCodeCounter); + } - ourLog.info("Batch expansion for {} with starting index of {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), firstResult, countForBatch, swForBatch.getMillis()); + ourLog.info("Batch expansion for {} with starting index of {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), firstResult, countForBatch, swForBatch.getMillis()); - if (resultsInBatch < maxResultsPerBatch) { - break; - } else { - jpaQuery.setFirstResult(firstResult + maxResultsPerBatch); - } - } while (true); - - ourLog.info("Expansion for {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), count, sw.getMillis()); + if (resultsInBatch < maxResultsPerBatch) { + ourLog.info("Expansion for {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), count, sw.getMillis()); + return false; + } else { + return true; + } } else { // No codesystem matching the URL found in the database @@ -886,8 +905,10 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, addConceptsToList(theValueSetCodeAccumulator, theAddedCodes, system, concept, theAdd); } + return false; } } else if (hasValueSet) { + for (CanonicalType nextValueSet : theInclude.getValueSet()) { ourLog.info("Starting {} expansion around ValueSet URI: {}", (theAdd ? "inclusion" : "exclusion"), nextValueSet.getValueAsString()); @@ -908,9 +929,14 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, } } + + return false; + } else { throw new InvalidRequestException("ValueSet contains " + (theAdd ? "include" : "exclude") + " criteria with no system defined"); } + + } private void expandWithoutHibernateSearch(IValueSetConceptAccumulator theValueSetCodeAccumulator, Set theAddedCodes, ValueSet.ConceptSetComponent theInclude, String theSystem, boolean theAdd, AtomicInteger theCodeCounter) { @@ -974,7 +1000,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, */ TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager); txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_MANDATORY); - return txTemplate.execute(t->{ + return txTemplate.execute(t -> { TermCodeSystemVersion csv = findCurrentCodeSystemVersionForSystem(theCodeSystem); return myConceptDao.findByCodeSystemAndCode(csv, theCode); }); @@ -1269,7 +1295,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, } else { return saveConcept(theConcept); } - + } /** @@ -1724,7 +1750,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, ourLog.info("Done storing TermConceptMap."); } -// @Scheduled(fixedDelay = 600000) // 10 minutes. + // @Scheduled(fixedDelay = 600000) // 10 minutes. @Scheduled(fixedDelay = 60000) // FIXME: DM 2019-08-19 - Remove this! @Override public synchronized void preExpandValueSetToTerminologyTables() { @@ -1732,29 +1758,47 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc, ourLog.info("Skipping scheduled pre-expansion of ValueSets while deferred entities are being loaded."); return; } - new TransactionTemplate(myTxManager).execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus theStatus) { - boolean hasNextTermValueSetNotExpanded = true; - do { - Optional optionalTermValueSet = getNextTermValueSetNotExpanded(); - if (optionalTermValueSet.isPresent()) { - TermValueSet termValueSet = optionalTermValueSet.get(); - termValueSet.setExpansionStatus(TermValueSetPreExpansionStatusEnum.EXPANSION_IN_PROGRESS); - myValueSetDao.saveAndFlush(termValueSet); + TransactionTemplate txTemplate = new TransactionTemplate(myTxManager); - ValueSet valueSet = getValueSetFromResourceTable(termValueSet.getResource()); + while (true) { - expandValueSet(valueSet, new ValueSetConceptAccumulator(termValueSet, myValueSetConceptDao, myValueSetConceptDesignationDao)); + TermValueSet valueSetToExpand = txTemplate.execute(t -> { + Optional optionalTermValueSet = getNextTermValueSetNotExpanded(); + if (optionalTermValueSet.isPresent() == false) { + return null; + } - termValueSet.setExpansionStatus(TermValueSetPreExpansionStatusEnum.EXPANDED); - myValueSetDao.saveAndFlush(termValueSet); - } else { - hasNextTermValueSetNotExpanded = false; - } - } while (hasNextTermValueSetNotExpanded); + TermValueSet termValueSet = optionalTermValueSet.get(); + termValueSet.setExpansionStatus(TermValueSetPreExpansionStatusEnum.EXPANSION_IN_PROGRESS); + return myValueSetDao.saveAndFlush(termValueSet); + }); + if (valueSetToExpand == null) { + return; } - }); + + // Ok so we have a VS to expand + try { + ValueSet valueSet = txTemplate.execute(t -> getValueSetFromResourceTable(valueSetToExpand.getResource())); + expandValueSet(valueSet, new ValueSetConceptAccumulator(valueSetToExpand, myValueSetConceptDao, myValueSetConceptDesignationDao)); + + // We're done with this guy + txTemplate.execute(t -> { + valueSetToExpand.setExpansionStatus(TermValueSetPreExpansionStatusEnum.EXPANDED); + myValueSetDao.saveAndFlush(valueSetToExpand); + return null; + }); + + } catch (Exception e) { + ourLog.error("Failed to expand valueset: " + e.getMessage(), e); + txTemplate.execute(t -> { + valueSetToExpand.setExpansionStatus(TermValueSetPreExpansionStatusEnum.FAILED_TO_EXPAND); + myValueSetDao.saveAndFlush(valueSetToExpand); + return null; + }); + } + } + + } private boolean isNotSafeToPreExpandValueSets() {