I think this might work now..

This commit is contained in:
James Agnew 2019-08-23 12:22:27 -04:00
parent 890555a77d
commit 10958a8e4d
2 changed files with 97 additions and 51 deletions

View File

@ -32,9 +32,11 @@ public enum TermValueSetPreExpansionStatusEnum {
/** /**
* Sorting agnostic. * Sorting agnostic.
*/ */
// FIXME: add a unit test that verifies a message exists for each code
NOT_EXPANDED("notExpanded"), NOT_EXPANDED("notExpanded"),
EXPANSION_IN_PROGRESS("expansionInProgress"), EXPANSION_IN_PROGRESS("expansionInProgress"),
EXPANDED("expanded"); EXPANDED("expanded"),
FAILED_TO_EXPAND("failedToExpand");
private static Map<String, TermValueSetPreExpansionStatusEnum> ourValues; private static Map<String, TermValueSetPreExpansionStatusEnum> ourValues;
private String myCode; private String myCode;

View File

@ -618,6 +618,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
expandValueSet(theValueSetToExpand, theValueSetCodeAccumulator, new AtomicInteger(0)); expandValueSet(theValueSetToExpand, theValueSetCodeAccumulator, new AtomicInteger(0));
} }
@SuppressWarnings("ConstantConditions")
private void expandValueSet(ValueSet theValueSetToExpand, IValueSetConceptAccumulator theValueSetCodeAccumulator, AtomicInteger theCodeCounter) { private void expandValueSet(ValueSet theValueSetToExpand, IValueSetConceptAccumulator theValueSetCodeAccumulator, AtomicInteger theCodeCounter) {
Set<String> addedCodes = new HashSet<>(); Set<String> addedCodes = new HashSet<>();
@ -625,16 +626,32 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
ourLog.debug("Handling includes"); ourLog.debug("Handling includes");
for (ValueSet.ConceptSetComponent include : theValueSetToExpand.getCompose().getInclude()) { for (ValueSet.ConceptSetComponent include : theValueSetToExpand.getCompose().getInclude()) {
ourLog.info("Working with " + identifyValueSetForLogging(theValueSetToExpand)); ourLog.info("Working with " + identifyValueSetForLogging(theValueSetToExpand));
boolean add = true; for (int i = 0; ; i++) {
expandValueSetHandleIncludeOrExclude(theValueSetCodeAccumulator, addedCodes, include, add, theCodeCounter); int finalI = i;
boolean shouldContinue = myTxTemplate.execute(t -> {
boolean add = true;
return expandValueSetHandleIncludeOrExclude(theValueSetCodeAccumulator, addedCodes, include, add, theCodeCounter, finalI);
});
if (!shouldContinue) {
break;
}
}
} }
// Handle excludes // Handle excludes
ourLog.debug("Handling excludes"); ourLog.debug("Handling excludes");
for (ValueSet.ConceptSetComponent exclude : theValueSetToExpand.getCompose().getExclude()) { for (ValueSet.ConceptSetComponent exclude : theValueSetToExpand.getCompose().getExclude()) {
ourLog.info("Working with " + identifyValueSetForLogging(theValueSetToExpand)); ourLog.info("Working with " + identifyValueSetForLogging(theValueSetToExpand));
boolean add = false; for (int i = 0; ; i++) {
expandValueSetHandleIncludeOrExclude(theValueSetCodeAccumulator, addedCodes, exclude, add, theCodeCounter); int finalI = i;
boolean shouldContinue = myTxTemplate.execute(t -> {
boolean add = false;
return expandValueSetHandleIncludeOrExclude(theValueSetCodeAccumulator, addedCodes, exclude, add, theCodeCounter, finalI);
});
if (!shouldContinue) {
break;
}
}
} }
} }
@ -685,7 +702,11 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
return retVal; return retVal;
} }
private void expandValueSetHandleIncludeOrExclude(IValueSetConceptAccumulator theValueSetCodeAccumulator, Set<String> theAddedCodes, ValueSet.ConceptSetComponent theInclude, boolean theAdd, AtomicInteger theCodeCounter) { /**
* @return Returns true if there are potentially more results to return
*/
private boolean expandValueSetHandleIncludeOrExclude(IValueSetConceptAccumulator theValueSetCodeAccumulator, Set<String> theAddedCodes, ValueSet.ConceptSetComponent theInclude, boolean theAdd, AtomicInteger theCodeCounter, int theQueryIndex) {
String system = theInclude.getSystem(); String system = theInclude.getSystem();
boolean hasSystem = isNotBlank(system); boolean hasSystem = isNotBlank(system);
boolean hasValueSet = theInclude.getValueSet().size() > 0; boolean hasValueSet = theInclude.getValueSet().size() > 0;
@ -695,6 +716,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
TermCodeSystem cs = myCodeSystemDao.findByCodeSystemUri(system); TermCodeSystem cs = myCodeSystemDao.findByCodeSystemUri(system);
if (cs != null) { if (cs != null) {
TermCodeSystemVersion csv = cs.getCurrentVersion(); TermCodeSystemVersion csv = cs.getCurrentVersion();
FullTextEntityManager em = org.hibernate.search.jpa.Search.getFullTextEntityManager(myEntityManager); FullTextEntityManager em = org.hibernate.search.jpa.Search.getFullTextEntityManager(myEntityManager);
@ -704,7 +726,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
*/ */
if (myFulltextSearchSvc == null) { if (myFulltextSearchSvc == null) {
expandWithoutHibernateSearch(theValueSetCodeAccumulator, theAddedCodes, theInclude, system, theAdd, theCodeCounter); expandWithoutHibernateSearch(theValueSetCodeAccumulator, theAddedCodes, theInclude, system, theAdd, theCodeCounter);
return; return false;
} }
/* /*
@ -829,34 +851,31 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
int maxResultsPerBatch = 10000; int maxResultsPerBatch = 10000;
jpaQuery.setMaxResults(maxResultsPerBatch); jpaQuery.setMaxResults(maxResultsPerBatch);
jpaQuery.setFirstResult(0); jpaQuery.setFirstResult(theQueryIndex * maxResultsPerBatch);
ourLog.info("Beginning batch expansion for {} with max results per batch: {}", (theAdd ? "inclusion" : "exclusion"), maxResultsPerBatch); ourLog.info("Beginning batch expansion for {} with max results per batch: {}", (theAdd ? "inclusion" : "exclusion"), maxResultsPerBatch);
do { StopWatch swForBatch = new StopWatch();
StopWatch swForBatch = new StopWatch(); AtomicInteger countForBatch = new AtomicInteger(0);
AtomicInteger countForBatch = new AtomicInteger(0);
List resultList = jpaQuery.getResultList(); List resultList = jpaQuery.getResultList();
int resultsInBatch = jpaQuery.getResultSize(); int resultsInBatch = jpaQuery.getResultSize();
int firstResult = jpaQuery.getFirstResult(); int firstResult = jpaQuery.getFirstResult();
for (Object next : resultList) { for (Object next : resultList) {
count.incrementAndGet(); count.incrementAndGet();
countForBatch.incrementAndGet(); countForBatch.incrementAndGet();
TermConcept concept = (TermConcept) next; TermConcept concept = (TermConcept) next;
addCodeIfNotAlreadyAdded(theValueSetCodeAccumulator, theAddedCodes, concept, theAdd, theCodeCounter); addCodeIfNotAlreadyAdded(theValueSetCodeAccumulator, theAddedCodes, concept, theAdd, theCodeCounter);
} }
ourLog.info("Batch expansion for {} with starting index of {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), firstResult, countForBatch, swForBatch.getMillis()); ourLog.info("Batch expansion for {} with starting index of {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), firstResult, countForBatch, swForBatch.getMillis());
if (resultsInBatch < maxResultsPerBatch) { if (resultsInBatch < maxResultsPerBatch) {
break; ourLog.info("Expansion for {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), count, sw.getMillis());
} else { return false;
jpaQuery.setFirstResult(firstResult + maxResultsPerBatch); } else {
} return true;
} while (true); }
ourLog.info("Expansion for {} produced {} results in {}ms", (theAdd ? "inclusion" : "exclusion"), count, sw.getMillis());
} else { } else {
// No codesystem matching the URL found in the database // No codesystem matching the URL found in the database
@ -886,8 +905,10 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
addConceptsToList(theValueSetCodeAccumulator, theAddedCodes, system, concept, theAdd); addConceptsToList(theValueSetCodeAccumulator, theAddedCodes, system, concept, theAdd);
} }
return false;
} }
} else if (hasValueSet) { } else if (hasValueSet) {
for (CanonicalType nextValueSet : theInclude.getValueSet()) { for (CanonicalType nextValueSet : theInclude.getValueSet()) {
ourLog.info("Starting {} expansion around ValueSet URI: {}", (theAdd ? "inclusion" : "exclusion"), nextValueSet.getValueAsString()); ourLog.info("Starting {} expansion around ValueSet URI: {}", (theAdd ? "inclusion" : "exclusion"), nextValueSet.getValueAsString());
@ -908,9 +929,14 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
} }
} }
return false;
} else { } else {
throw new InvalidRequestException("ValueSet contains " + (theAdd ? "include" : "exclude") + " criteria with no system defined"); throw new InvalidRequestException("ValueSet contains " + (theAdd ? "include" : "exclude") + " criteria with no system defined");
} }
} }
private void expandWithoutHibernateSearch(IValueSetConceptAccumulator theValueSetCodeAccumulator, Set<String> theAddedCodes, ValueSet.ConceptSetComponent theInclude, String theSystem, boolean theAdd, AtomicInteger theCodeCounter) { private void expandWithoutHibernateSearch(IValueSetConceptAccumulator theValueSetCodeAccumulator, Set<String> theAddedCodes, ValueSet.ConceptSetComponent theInclude, String theSystem, boolean theAdd, AtomicInteger theCodeCounter) {
@ -974,7 +1000,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
*/ */
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager); TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_MANDATORY); txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_MANDATORY);
return txTemplate.execute(t->{ return txTemplate.execute(t -> {
TermCodeSystemVersion csv = findCurrentCodeSystemVersionForSystem(theCodeSystem); TermCodeSystemVersion csv = findCurrentCodeSystemVersionForSystem(theCodeSystem);
return myConceptDao.findByCodeSystemAndCode(csv, theCode); return myConceptDao.findByCodeSystemAndCode(csv, theCode);
}); });
@ -1269,7 +1295,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
} else { } else {
return saveConcept(theConcept); return saveConcept(theConcept);
} }
} }
/** /**
@ -1724,7 +1750,7 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
ourLog.info("Done storing TermConceptMap."); ourLog.info("Done storing TermConceptMap.");
} }
// @Scheduled(fixedDelay = 600000) // 10 minutes. // @Scheduled(fixedDelay = 600000) // 10 minutes.
@Scheduled(fixedDelay = 60000) // FIXME: DM 2019-08-19 - Remove this! @Scheduled(fixedDelay = 60000) // FIXME: DM 2019-08-19 - Remove this!
@Override @Override
public synchronized void preExpandValueSetToTerminologyTables() { public synchronized void preExpandValueSetToTerminologyTables() {
@ -1732,29 +1758,47 @@ public abstract class BaseHapiTerminologySvcImpl implements IHapiTerminologySvc,
ourLog.info("Skipping scheduled pre-expansion of ValueSets while deferred entities are being loaded."); ourLog.info("Skipping scheduled pre-expansion of ValueSets while deferred entities are being loaded.");
return; return;
} }
new TransactionTemplate(myTxManager).execute(new TransactionCallbackWithoutResult() { TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
@Override
protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
boolean hasNextTermValueSetNotExpanded = true;
do {
Optional<TermValueSet> optionalTermValueSet = getNextTermValueSetNotExpanded();
if (optionalTermValueSet.isPresent()) {
TermValueSet termValueSet = optionalTermValueSet.get();
termValueSet.setExpansionStatus(TermValueSetPreExpansionStatusEnum.EXPANSION_IN_PROGRESS);
myValueSetDao.saveAndFlush(termValueSet);
ValueSet valueSet = getValueSetFromResourceTable(termValueSet.getResource()); while (true) {
expandValueSet(valueSet, new ValueSetConceptAccumulator(termValueSet, myValueSetConceptDao, myValueSetConceptDesignationDao)); TermValueSet valueSetToExpand = txTemplate.execute(t -> {
Optional<TermValueSet> optionalTermValueSet = getNextTermValueSetNotExpanded();
if (optionalTermValueSet.isPresent() == false) {
return null;
}
termValueSet.setExpansionStatus(TermValueSetPreExpansionStatusEnum.EXPANDED); TermValueSet termValueSet = optionalTermValueSet.get();
myValueSetDao.saveAndFlush(termValueSet); termValueSet.setExpansionStatus(TermValueSetPreExpansionStatusEnum.EXPANSION_IN_PROGRESS);
} else { return myValueSetDao.saveAndFlush(termValueSet);
hasNextTermValueSetNotExpanded = false; });
} if (valueSetToExpand == null) {
} while (hasNextTermValueSetNotExpanded); return;
} }
});
// Ok so we have a VS to expand
try {
ValueSet valueSet = txTemplate.execute(t -> getValueSetFromResourceTable(valueSetToExpand.getResource()));
expandValueSet(valueSet, new ValueSetConceptAccumulator(valueSetToExpand, myValueSetConceptDao, myValueSetConceptDesignationDao));
// We're done with this guy
txTemplate.execute(t -> {
valueSetToExpand.setExpansionStatus(TermValueSetPreExpansionStatusEnum.EXPANDED);
myValueSetDao.saveAndFlush(valueSetToExpand);
return null;
});
} catch (Exception e) {
ourLog.error("Failed to expand valueset: " + e.getMessage(), e);
txTemplate.execute(t -> {
valueSetToExpand.setExpansionStatus(TermValueSetPreExpansionStatusEnum.FAILED_TO_EXPAND);
myValueSetDao.saveAndFlush(valueSetToExpand);
return null;
});
}
}
} }
private boolean isNotSafeToPreExpandValueSets() { private boolean isNotSafeToPreExpandValueSets() {