diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseHapiTerminologySvc.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseHapiTerminologySvc.java index a2b3d0fa981..de92bb232ce 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseHapiTerminologySvc.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/term/BaseHapiTerminologySvc.java @@ -10,7 +10,7 @@ package ca.uhn.fhir.jpa.term; * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,21 +19,10 @@ package ca.uhn.fhir.jpa.term; * limitations under the License. * #L% */ - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; -import javax.persistence.EntityManager; -import javax.persistence.PersistenceContext; -import javax.persistence.PersistenceContextType; +import javax.persistence.*; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.time.DateUtils; @@ -51,19 +40,12 @@ import org.springframework.transaction.support.TransactionTemplate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimaps; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; import ca.uhn.fhir.jpa.dao.DaoConfig; -import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; -import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao; -import ca.uhn.fhir.jpa.dao.data.ITermConceptDao; -import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao; -import ca.uhn.fhir.jpa.entity.TermCodeSystem; -import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion; -import ca.uhn.fhir.jpa.entity.TermConcept; -import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink; +import ca.uhn.fhir.jpa.dao.data.*; +import ca.uhn.fhir.jpa.entity.*; import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink.RelationshipTypeEnum; import ca.uhn.fhir.jpa.util.StopWatch; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; @@ -76,6 +58,8 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiTerminologySvc.class); private static final Object PLACEHOLDER_OBJECT = new Object(); + private ArrayListMultimap myChildToParentPidCache; + @Autowired protected ITermCodeSystemDao myCodeSystemDao; @@ -100,8 +84,8 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { @PersistenceContext(type = PersistenceContextType.TRANSACTION) protected EntityManager myEntityManager; - private long myNextReindexPass; + private boolean myProcessDeferred = true; @Autowired @@ -121,7 +105,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { private int ensureParentsSaved(Collection theParents) { ourLog.trace("Checking {} parents", theParents.size()); int retVal = 0; - + for (TermConceptParentChildLink nextLink : theParents) { if (nextLink.getRelationshipType() == RelationshipTypeEnum.ISA) { TermConcept nextParent = nextLink.getParent(); @@ -133,7 +117,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { } } } - + return retVal; } @@ -206,8 +190,11 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { /** * Subclasses may override - * @param theSystem The code system - * @param theCode The code + * + * @param theSystem + * The code system + * @param theCode + * The code */ protected List findCodesAboveUsingBuiltInSystems(String theSystem, String theCode) { return Collections.emptyList(); @@ -244,15 +231,19 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { ArrayList retVal = toVersionIndependentConcepts(theSystem, codes); return retVal; } + /** * Subclasses may override - * @param theSystem The code system - * @param theCode The code + * + * @param theSystem + * The code system + * @param theCode + * The code */ protected List findCodesBelowUsingBuiltInSystems(String theSystem, String theCode) { return Collections.emptyList(); } - + private TermCodeSystemVersion findCurrentCodeSystemVersionForSystem(String theCodeSystem) { TermCodeSystem cs = getCodeSystem(theCodeSystem); if (cs == null || cs.getCurrentVersion() == null) { @@ -274,7 +265,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { if (theConceptsStack.size() == 1 || theConceptsStack.size() % 10000 == 0) { float pct = (float) theConceptsStack.size() / (float) theTotalConcepts; - ourLog.info("Have processed {}/{} concepts ({}%)", theConceptsStack.size(), theTotalConcepts, (int)( pct*100.0f)); + ourLog.info("Have processed {}/{} concepts ({}%)", theConceptsStack.size(), theTotalConcepts, (int) (pct * 100.0f)); } theConcept.setCodeSystem(theCodeSystem); @@ -285,7 +276,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { } else { myConceptsToSaveLater.add(theConcept); } - + for (TermConceptParentChildLink next : theConcept.getChildren()) { persistChildren(next.getChild(), theCodeSystem, theConceptsStack, theTotalConcepts); } @@ -297,7 +288,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { myConceptLinksToSaveLater.add(next); } } - + } private void populateVersion(TermConcept theNext, TermCodeSystemVersion theCodeSystemVersion) { @@ -310,48 +301,56 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { } } - private ArrayListMultimap myChildToParentPidCache; - + private void processDeferredConcepts() { + int codeCount = 0, relCount = 0; + StopWatch stopwatch = new StopWatch(); + + int count = Math.min(myDaoConfig.getDeferIndexingForCodesystemsOfSize(), myConceptsToSaveLater.size()); + ourLog.info("Saving {} deferred concepts...", count); + while (codeCount < count && myConceptsToSaveLater.size() > 0) { + TermConcept next = myConceptsToSaveLater.remove(0); + codeCount += saveConcept(next); + } + + if (codeCount > 0) { + ourLog.info("Saved {} deferred concepts ({} codes remain and {} relationships remain) in {}ms ({}ms / code)", + new Object[] { codeCount, myConceptsToSaveLater.size(), myConceptLinksToSaveLater.size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(codeCount) }); + } + + if (codeCount == 0) { + count = Math.min(myDaoConfig.getDeferIndexingForCodesystemsOfSize(), myConceptLinksToSaveLater.size()); + ourLog.info("Saving {} deferred concept relationships...", count); + while (relCount < count && myConceptLinksToSaveLater.size() > 0) { + TermConceptParentChildLink next = myConceptLinksToSaveLater.remove(0); + + if (myConceptDao.findOne(next.getChild().getId()) == null || myConceptDao.findOne(next.getParent().getId()) == null) { + ourLog.warn("Not inserting link from child {} to parent {} because it appears to have been deleted", next.getParent().getCode(), next.getChild().getCode()); + continue; + } + + saveConceptLink(next); + relCount++; + } + } + + if (relCount > 0) { + ourLog.info("Saved {} deferred relationships ({} remain) in {}ms ({}ms / code)", + new Object[] { relCount, myConceptLinksToSaveLater.size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(codeCount) }); + } + + if ((myConceptsToSaveLater.size() + myConceptLinksToSaveLater.size()) == 0) { + ourLog.info("All deferred concepts and relationships have now been synchronized to the database"); + } + } + private void processReindexing() { if (System.currentTimeMillis() < myNextReindexPass && !ourForceSaveDeferredAlwaysForUnitTest) { return; } - + TransactionTemplate tt = new TransactionTemplate(myTransactionMgr); tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW); tt.execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus theArg0) { - int maxResult = 1000; - Page concepts = myConceptDao.findResourcesRequiringReindexing(new PageRequest(0, maxResult)); - if (concepts.hasContent() == false) { - myNextReindexPass = System.currentTimeMillis() + DateUtils.MILLIS_PER_MINUTE; - myChildToParentPidCache = null; - return; - } - - if (myChildToParentPidCache == null) { - myChildToParentPidCache = ArrayListMultimap.create(); - } - - ourLog.info("Indexing {} / {} concepts", concepts.getContent().size(), concepts.getTotalElements()); - - int count = 0; - StopWatch stopwatch = new StopWatch(); - - for (TermConcept nextConcept : concepts) { - - StringBuilder parentsBuilder = new StringBuilder(); - createParentsString(parentsBuilder, nextConcept.getId()); - nextConcept.setParentPids(parentsBuilder.toString()); - - saveConcept(nextConcept); - count++; - } - - ourLog.info("Indexed {} / {} concepts in {}ms - Avg {}ms / resource", new Object[] { count, concepts.getContent().size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(count) }); - } - private void createParentsString(StringBuilder theParentsBuilder, Long theConceptPid) { Validate.notNull(theConceptPid, "theConceptPid must not be null"); List parents = myChildToParentPidCache.get(theConceptPid); @@ -368,8 +367,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { } } } - - + for (Long nextParent : parents) { if (theParentsBuilder.length() > 0) { theParentsBuilder.append(' '); @@ -378,13 +376,45 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { createParentsString(theParentsBuilder, nextParent); } } + + @Override + protected void doInTransactionWithoutResult(TransactionStatus theArg0) { + int maxResult = 1000; + Page concepts = myConceptDao.findResourcesRequiringReindexing(new PageRequest(0, maxResult)); + if (concepts.hasContent() == false) { + myNextReindexPass = System.currentTimeMillis() + DateUtils.MILLIS_PER_MINUTE; + myChildToParentPidCache = null; + return; + } + + if (myChildToParentPidCache == null) { + myChildToParentPidCache = ArrayListMultimap.create(); + } + + ourLog.info("Indexing {} / {} concepts", concepts.getContent().size(), concepts.getTotalElements()); + + int count = 0; + StopWatch stopwatch = new StopWatch(); + + for (TermConcept nextConcept : concepts) { + + StringBuilder parentsBuilder = new StringBuilder(); + createParentsString(parentsBuilder, nextConcept.getId()); + nextConcept.setParentPids(parentsBuilder.toString()); + + saveConcept(nextConcept); + count++; + } + + ourLog.info("Indexed {} / {} concepts in {}ms - Avg {}ms / resource", new Object[] { count, concepts.getContent().size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(count) }); + } }); } private int saveConcept(TermConcept theConcept) { int retVal = 0; - + /* * If the concept has an ID, we're reindexing, so there's no need to * save parent concepts first (it's way too slow to do that) @@ -392,25 +422,25 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { if (theConcept.getId() == null) { retVal += ensureParentsSaved(theConcept.getParents()); } - + if (theConcept.getId() == null || theConcept.getIndexStatus() == null) { retVal++; theConcept.setIndexStatus(BaseHapiFhirDao.INDEX_STATUS_INDEXED); myConceptDao.save(theConcept); } - + ourLog.trace("Saved {} and got PID {}", theConcept.getCode(), theConcept.getId()); return retVal; } - + private void saveConceptLink(TermConceptParentChildLink next) { if (next.getId() == null) { myConceptParentChildLinkDao.save(next); } } - @Scheduled(fixedRate=5000) - @Transactional(propagation=Propagation.REQUIRED) + @Scheduled(fixedRate = 5000) + @Transactional(propagation = Propagation.NOT_SUPPORTED) @Override public synchronized void saveDeferred() { if (!myProcessDeferred) { @@ -419,46 +449,18 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { processReindexing(); return; } - - int codeCount = 0, relCount = 0; - StopWatch stopwatch = new StopWatch(); - - int count = Math.min(myDaoConfig.getDeferIndexingForCodesystemsOfSize(), myConceptsToSaveLater.size()); - ourLog.info("Saving {} deferred concepts...", count); - while (codeCount < count && myConceptsToSaveLater.size() > 0) { - TermConcept next = myConceptsToSaveLater.remove(0); - codeCount += saveConcept(next); - } - if (codeCount > 0) { - ourLog.info("Saved {} deferred concepts ({} codes remain and {} relationships remain) in {}ms ({}ms / code)", new Object[] {codeCount, myConceptsToSaveLater.size(), myConceptLinksToSaveLater.size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(codeCount)}); - } - - if (codeCount == 0) { - count = Math.min(myDaoConfig.getDeferIndexingForCodesystemsOfSize(), myConceptLinksToSaveLater.size()); - ourLog.info("Saving {} deferred concept relationships...", count); - while (relCount < count && myConceptLinksToSaveLater.size() > 0) { - TermConceptParentChildLink next = myConceptLinksToSaveLater.remove(0); - - if (myConceptDao.findOne(next.getChild().getId()) == null || myConceptDao.findOne(next.getParent().getId()) == null) { - ourLog.warn("Not inserting link from child {} to parent {} because it appears to have been deleted", next.getParent().getCode(), next.getChild().getCode()); - continue; - } - - saveConceptLink(next); - relCount++; + TransactionTemplate tt = new TransactionTemplate(myTransactionMgr); + tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW); + tt.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(TransactionStatus theArg0) { + processDeferredConcepts(); } - } - - if (relCount > 0) { - ourLog.info("Saved {} deferred relationships ({} remain) in {}ms ({}ms / code)", new Object[] {relCount, myConceptLinksToSaveLater.size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(codeCount)}); - } - - if ((myConceptsToSaveLater.size() + myConceptLinksToSaveLater.size()) == 0) { - ourLog.info("All deferred concepts and relationships have now been synchronized to the database"); - } + + }); } - + @Override public void setProcessDeferred(boolean theProcessDeferred) { myProcessDeferred = theProcessDeferred; @@ -487,7 +489,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { } ourLog.info("Flushing..."); - + myConceptParentChildLinkDao.flush(); myConceptDao.flush(); @@ -540,7 +542,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { } ourLog.info("Saving {} concepts...", totalCodeCount); - + IdentityHashMap conceptsStack2 = new IdentityHashMap(); for (TermConcept next : theCodeSystemVersion.getConcepts()) { persistChildren(next, codeSystemVersion, conceptsStack2, totalCodeCount); @@ -552,7 +554,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { myConceptParentChildLinkDao.flush(); ourLog.info("Done deleting old code system versions"); - + if (myConceptsToSaveLater.size() > 0 || myConceptLinksToSaveLater.size() > 0) { ourLog.info("Note that some concept saving was deferred - still have {} concepts and {} relationships", myConceptsToSaveLater.size(), myConceptLinksToSaveLater.size()); } @@ -563,7 +565,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { TermCodeSystem cs = getCodeSystem(theSystem); return cs != null; } - + private ArrayList toVersionIndependentConcepts(String theSystem, Set codes) { ArrayList retVal = new ArrayList(codes.size()); for (TermConcept next : codes) { @@ -582,7 +584,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc { throw new InvalidRequestException("CodeSystem contains circular reference around code " + theConcept.getCode()); } theConceptsStack.add(theConcept.getCode()); - + int retVal = 0; if (theAllConcepts.put(theConcept, theAllConcepts) == null) { if (theAllConcepts.size() % 1000 == 0) {