One more deadlock

This commit is contained in:
James Agnew 2017-07-20 11:19:53 -04:00
parent 9d08e1e211
commit 82171da0cc
1 changed files with 121 additions and 119 deletions

View File

@ -10,7 +10,7 @@ package ca.uhn.fhir.jpa.term;
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -19,21 +19,10 @@ package ca.uhn.fhir.jpa.term;
* limitations under the License. * limitations under the License.
* #L% * #L%
*/ */
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.persistence.EntityManager; import javax.persistence.*;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceContextType;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
@ -51,19 +40,12 @@ import org.springframework.transaction.support.TransactionTemplate;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimaps;
import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.DaoConfig; import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao; import ca.uhn.fhir.jpa.dao.data.*;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao; import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao;
import ca.uhn.fhir.jpa.entity.TermCodeSystem;
import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink.RelationshipTypeEnum; import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink.RelationshipTypeEnum;
import ca.uhn.fhir.jpa.util.StopWatch; import ca.uhn.fhir.jpa.util.StopWatch;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
@ -76,6 +58,8 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiTerminologySvc.class); private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiTerminologySvc.class);
private static final Object PLACEHOLDER_OBJECT = new Object(); private static final Object PLACEHOLDER_OBJECT = new Object();
private ArrayListMultimap<Long, Long> myChildToParentPidCache;
@Autowired @Autowired
protected ITermCodeSystemDao myCodeSystemDao; protected ITermCodeSystemDao myCodeSystemDao;
@ -100,8 +84,8 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
@PersistenceContext(type = PersistenceContextType.TRANSACTION) @PersistenceContext(type = PersistenceContextType.TRANSACTION)
protected EntityManager myEntityManager; protected EntityManager myEntityManager;
private long myNextReindexPass; private long myNextReindexPass;
private boolean myProcessDeferred = true; private boolean myProcessDeferred = true;
@Autowired @Autowired
@ -121,7 +105,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
private int ensureParentsSaved(Collection<TermConceptParentChildLink> theParents) { private int ensureParentsSaved(Collection<TermConceptParentChildLink> theParents) {
ourLog.trace("Checking {} parents", theParents.size()); ourLog.trace("Checking {} parents", theParents.size());
int retVal = 0; int retVal = 0;
for (TermConceptParentChildLink nextLink : theParents) { for (TermConceptParentChildLink nextLink : theParents) {
if (nextLink.getRelationshipType() == RelationshipTypeEnum.ISA) { if (nextLink.getRelationshipType() == RelationshipTypeEnum.ISA) {
TermConcept nextParent = nextLink.getParent(); TermConcept nextParent = nextLink.getParent();
@ -133,7 +117,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
} }
} }
} }
return retVal; return retVal;
} }
@ -206,8 +190,11 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
/** /**
* Subclasses may override * Subclasses may override
* @param theSystem The code system *
* @param theCode The code * @param theSystem
* The code system
* @param theCode
* The code
*/ */
protected List<VersionIndependentConcept> findCodesAboveUsingBuiltInSystems(String theSystem, String theCode) { protected List<VersionIndependentConcept> findCodesAboveUsingBuiltInSystems(String theSystem, String theCode) {
return Collections.emptyList(); return Collections.emptyList();
@ -244,15 +231,19 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
ArrayList<VersionIndependentConcept> retVal = toVersionIndependentConcepts(theSystem, codes); ArrayList<VersionIndependentConcept> retVal = toVersionIndependentConcepts(theSystem, codes);
return retVal; return retVal;
} }
/** /**
* Subclasses may override * Subclasses may override
* @param theSystem The code system *
* @param theCode The code * @param theSystem
* The code system
* @param theCode
* The code
*/ */
protected List<VersionIndependentConcept> findCodesBelowUsingBuiltInSystems(String theSystem, String theCode) { protected List<VersionIndependentConcept> findCodesBelowUsingBuiltInSystems(String theSystem, String theCode) {
return Collections.emptyList(); return Collections.emptyList();
} }
private TermCodeSystemVersion findCurrentCodeSystemVersionForSystem(String theCodeSystem) { private TermCodeSystemVersion findCurrentCodeSystemVersionForSystem(String theCodeSystem) {
TermCodeSystem cs = getCodeSystem(theCodeSystem); TermCodeSystem cs = getCodeSystem(theCodeSystem);
if (cs == null || cs.getCurrentVersion() == null) { if (cs == null || cs.getCurrentVersion() == null) {
@ -274,7 +265,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
if (theConceptsStack.size() == 1 || theConceptsStack.size() % 10000 == 0) { if (theConceptsStack.size() == 1 || theConceptsStack.size() % 10000 == 0) {
float pct = (float) theConceptsStack.size() / (float) theTotalConcepts; float pct = (float) theConceptsStack.size() / (float) theTotalConcepts;
ourLog.info("Have processed {}/{} concepts ({}%)", theConceptsStack.size(), theTotalConcepts, (int)( pct*100.0f)); ourLog.info("Have processed {}/{} concepts ({}%)", theConceptsStack.size(), theTotalConcepts, (int) (pct * 100.0f));
} }
theConcept.setCodeSystem(theCodeSystem); theConcept.setCodeSystem(theCodeSystem);
@ -285,7 +276,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
} else { } else {
myConceptsToSaveLater.add(theConcept); myConceptsToSaveLater.add(theConcept);
} }
for (TermConceptParentChildLink next : theConcept.getChildren()) { for (TermConceptParentChildLink next : theConcept.getChildren()) {
persistChildren(next.getChild(), theCodeSystem, theConceptsStack, theTotalConcepts); persistChildren(next.getChild(), theCodeSystem, theConceptsStack, theTotalConcepts);
} }
@ -297,7 +288,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
myConceptLinksToSaveLater.add(next); myConceptLinksToSaveLater.add(next);
} }
} }
} }
private void populateVersion(TermConcept theNext, TermCodeSystemVersion theCodeSystemVersion) { private void populateVersion(TermConcept theNext, TermCodeSystemVersion theCodeSystemVersion) {
@ -310,48 +301,56 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
} }
} }
private ArrayListMultimap<Long, Long> myChildToParentPidCache; private void processDeferredConcepts() {
int codeCount = 0, relCount = 0;
StopWatch stopwatch = new StopWatch();
int count = Math.min(myDaoConfig.getDeferIndexingForCodesystemsOfSize(), myConceptsToSaveLater.size());
ourLog.info("Saving {} deferred concepts...", count);
while (codeCount < count && myConceptsToSaveLater.size() > 0) {
TermConcept next = myConceptsToSaveLater.remove(0);
codeCount += saveConcept(next);
}
if (codeCount > 0) {
ourLog.info("Saved {} deferred concepts ({} codes remain and {} relationships remain) in {}ms ({}ms / code)",
new Object[] { codeCount, myConceptsToSaveLater.size(), myConceptLinksToSaveLater.size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(codeCount) });
}
if (codeCount == 0) {
count = Math.min(myDaoConfig.getDeferIndexingForCodesystemsOfSize(), myConceptLinksToSaveLater.size());
ourLog.info("Saving {} deferred concept relationships...", count);
while (relCount < count && myConceptLinksToSaveLater.size() > 0) {
TermConceptParentChildLink next = myConceptLinksToSaveLater.remove(0);
if (myConceptDao.findOne(next.getChild().getId()) == null || myConceptDao.findOne(next.getParent().getId()) == null) {
ourLog.warn("Not inserting link from child {} to parent {} because it appears to have been deleted", next.getParent().getCode(), next.getChild().getCode());
continue;
}
saveConceptLink(next);
relCount++;
}
}
if (relCount > 0) {
ourLog.info("Saved {} deferred relationships ({} remain) in {}ms ({}ms / code)",
new Object[] { relCount, myConceptLinksToSaveLater.size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(codeCount) });
}
if ((myConceptsToSaveLater.size() + myConceptLinksToSaveLater.size()) == 0) {
ourLog.info("All deferred concepts and relationships have now been synchronized to the database");
}
}
private void processReindexing() { private void processReindexing() {
if (System.currentTimeMillis() < myNextReindexPass && !ourForceSaveDeferredAlwaysForUnitTest) { if (System.currentTimeMillis() < myNextReindexPass && !ourForceSaveDeferredAlwaysForUnitTest) {
return; return;
} }
TransactionTemplate tt = new TransactionTemplate(myTransactionMgr); TransactionTemplate tt = new TransactionTemplate(myTransactionMgr);
tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW); tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
tt.execute(new TransactionCallbackWithoutResult() { tt.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
int maxResult = 1000;
Page<TermConcept> concepts = myConceptDao.findResourcesRequiringReindexing(new PageRequest(0, maxResult));
if (concepts.hasContent() == false) {
myNextReindexPass = System.currentTimeMillis() + DateUtils.MILLIS_PER_MINUTE;
myChildToParentPidCache = null;
return;
}
if (myChildToParentPidCache == null) {
myChildToParentPidCache = ArrayListMultimap.create();
}
ourLog.info("Indexing {} / {} concepts", concepts.getContent().size(), concepts.getTotalElements());
int count = 0;
StopWatch stopwatch = new StopWatch();
for (TermConcept nextConcept : concepts) {
StringBuilder parentsBuilder = new StringBuilder();
createParentsString(parentsBuilder, nextConcept.getId());
nextConcept.setParentPids(parentsBuilder.toString());
saveConcept(nextConcept);
count++;
}
ourLog.info("Indexed {} / {} concepts in {}ms - Avg {}ms / resource", new Object[] { count, concepts.getContent().size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(count) });
}
private void createParentsString(StringBuilder theParentsBuilder, Long theConceptPid) { private void createParentsString(StringBuilder theParentsBuilder, Long theConceptPid) {
Validate.notNull(theConceptPid, "theConceptPid must not be null"); Validate.notNull(theConceptPid, "theConceptPid must not be null");
List<Long> parents = myChildToParentPidCache.get(theConceptPid); List<Long> parents = myChildToParentPidCache.get(theConceptPid);
@ -368,8 +367,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
} }
} }
} }
for (Long nextParent : parents) { for (Long nextParent : parents) {
if (theParentsBuilder.length() > 0) { if (theParentsBuilder.length() > 0) {
theParentsBuilder.append(' '); theParentsBuilder.append(' ');
@ -378,13 +376,45 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
createParentsString(theParentsBuilder, nextParent); createParentsString(theParentsBuilder, nextParent);
} }
} }
@Override
protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
int maxResult = 1000;
Page<TermConcept> concepts = myConceptDao.findResourcesRequiringReindexing(new PageRequest(0, maxResult));
if (concepts.hasContent() == false) {
myNextReindexPass = System.currentTimeMillis() + DateUtils.MILLIS_PER_MINUTE;
myChildToParentPidCache = null;
return;
}
if (myChildToParentPidCache == null) {
myChildToParentPidCache = ArrayListMultimap.create();
}
ourLog.info("Indexing {} / {} concepts", concepts.getContent().size(), concepts.getTotalElements());
int count = 0;
StopWatch stopwatch = new StopWatch();
for (TermConcept nextConcept : concepts) {
StringBuilder parentsBuilder = new StringBuilder();
createParentsString(parentsBuilder, nextConcept.getId());
nextConcept.setParentPids(parentsBuilder.toString());
saveConcept(nextConcept);
count++;
}
ourLog.info("Indexed {} / {} concepts in {}ms - Avg {}ms / resource", new Object[] { count, concepts.getContent().size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(count) });
}
}); });
} }
private int saveConcept(TermConcept theConcept) { private int saveConcept(TermConcept theConcept) {
int retVal = 0; int retVal = 0;
/* /*
* If the concept has an ID, we're reindexing, so there's no need to * If the concept has an ID, we're reindexing, so there's no need to
* save parent concepts first (it's way too slow to do that) * save parent concepts first (it's way too slow to do that)
@ -392,25 +422,25 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
if (theConcept.getId() == null) { if (theConcept.getId() == null) {
retVal += ensureParentsSaved(theConcept.getParents()); retVal += ensureParentsSaved(theConcept.getParents());
} }
if (theConcept.getId() == null || theConcept.getIndexStatus() == null) { if (theConcept.getId() == null || theConcept.getIndexStatus() == null) {
retVal++; retVal++;
theConcept.setIndexStatus(BaseHapiFhirDao.INDEX_STATUS_INDEXED); theConcept.setIndexStatus(BaseHapiFhirDao.INDEX_STATUS_INDEXED);
myConceptDao.save(theConcept); myConceptDao.save(theConcept);
} }
ourLog.trace("Saved {} and got PID {}", theConcept.getCode(), theConcept.getId()); ourLog.trace("Saved {} and got PID {}", theConcept.getCode(), theConcept.getId());
return retVal; return retVal;
} }
private void saveConceptLink(TermConceptParentChildLink next) { private void saveConceptLink(TermConceptParentChildLink next) {
if (next.getId() == null) { if (next.getId() == null) {
myConceptParentChildLinkDao.save(next); myConceptParentChildLinkDao.save(next);
} }
} }
@Scheduled(fixedRate=5000) @Scheduled(fixedRate = 5000)
@Transactional(propagation=Propagation.REQUIRED) @Transactional(propagation = Propagation.NOT_SUPPORTED)
@Override @Override
public synchronized void saveDeferred() { public synchronized void saveDeferred() {
if (!myProcessDeferred) { if (!myProcessDeferred) {
@ -419,46 +449,18 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
processReindexing(); processReindexing();
return; return;
} }
int codeCount = 0, relCount = 0;
StopWatch stopwatch = new StopWatch();
int count = Math.min(myDaoConfig.getDeferIndexingForCodesystemsOfSize(), myConceptsToSaveLater.size());
ourLog.info("Saving {} deferred concepts...", count);
while (codeCount < count && myConceptsToSaveLater.size() > 0) {
TermConcept next = myConceptsToSaveLater.remove(0);
codeCount += saveConcept(next);
}
if (codeCount > 0) { TransactionTemplate tt = new TransactionTemplate(myTransactionMgr);
ourLog.info("Saved {} deferred concepts ({} codes remain and {} relationships remain) in {}ms ({}ms / code)", new Object[] {codeCount, myConceptsToSaveLater.size(), myConceptLinksToSaveLater.size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(codeCount)}); tt.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
} tt.execute(new TransactionCallbackWithoutResult() {
@Override
if (codeCount == 0) { protected void doInTransactionWithoutResult(TransactionStatus theArg0) {
count = Math.min(myDaoConfig.getDeferIndexingForCodesystemsOfSize(), myConceptLinksToSaveLater.size()); processDeferredConcepts();
ourLog.info("Saving {} deferred concept relationships...", count);
while (relCount < count && myConceptLinksToSaveLater.size() > 0) {
TermConceptParentChildLink next = myConceptLinksToSaveLater.remove(0);
if (myConceptDao.findOne(next.getChild().getId()) == null || myConceptDao.findOne(next.getParent().getId()) == null) {
ourLog.warn("Not inserting link from child {} to parent {} because it appears to have been deleted", next.getParent().getCode(), next.getChild().getCode());
continue;
}
saveConceptLink(next);
relCount++;
} }
}
});
if (relCount > 0) {
ourLog.info("Saved {} deferred relationships ({} remain) in {}ms ({}ms / code)", new Object[] {relCount, myConceptLinksToSaveLater.size(), stopwatch.getMillis(), stopwatch.getMillisPerOperation(codeCount)});
}
if ((myConceptsToSaveLater.size() + myConceptLinksToSaveLater.size()) == 0) {
ourLog.info("All deferred concepts and relationships have now been synchronized to the database");
}
} }
@Override @Override
public void setProcessDeferred(boolean theProcessDeferred) { public void setProcessDeferred(boolean theProcessDeferred) {
myProcessDeferred = theProcessDeferred; myProcessDeferred = theProcessDeferred;
@ -487,7 +489,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
} }
ourLog.info("Flushing..."); ourLog.info("Flushing...");
myConceptParentChildLinkDao.flush(); myConceptParentChildLinkDao.flush();
myConceptDao.flush(); myConceptDao.flush();
@ -540,7 +542,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
} }
ourLog.info("Saving {} concepts...", totalCodeCount); ourLog.info("Saving {} concepts...", totalCodeCount);
IdentityHashMap<TermConcept, Object> conceptsStack2 = new IdentityHashMap<TermConcept, Object>(); IdentityHashMap<TermConcept, Object> conceptsStack2 = new IdentityHashMap<TermConcept, Object>();
for (TermConcept next : theCodeSystemVersion.getConcepts()) { for (TermConcept next : theCodeSystemVersion.getConcepts()) {
persistChildren(next, codeSystemVersion, conceptsStack2, totalCodeCount); persistChildren(next, codeSystemVersion, conceptsStack2, totalCodeCount);
@ -552,7 +554,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
myConceptParentChildLinkDao.flush(); myConceptParentChildLinkDao.flush();
ourLog.info("Done deleting old code system versions"); ourLog.info("Done deleting old code system versions");
if (myConceptsToSaveLater.size() > 0 || myConceptLinksToSaveLater.size() > 0) { if (myConceptsToSaveLater.size() > 0 || myConceptLinksToSaveLater.size() > 0) {
ourLog.info("Note that some concept saving was deferred - still have {} concepts and {} relationships", myConceptsToSaveLater.size(), myConceptLinksToSaveLater.size()); ourLog.info("Note that some concept saving was deferred - still have {} concepts and {} relationships", myConceptsToSaveLater.size(), myConceptLinksToSaveLater.size());
} }
@ -563,7 +565,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
TermCodeSystem cs = getCodeSystem(theSystem); TermCodeSystem cs = getCodeSystem(theSystem);
return cs != null; return cs != null;
} }
private ArrayList<VersionIndependentConcept> toVersionIndependentConcepts(String theSystem, Set<TermConcept> codes) { private ArrayList<VersionIndependentConcept> toVersionIndependentConcepts(String theSystem, Set<TermConcept> codes) {
ArrayList<VersionIndependentConcept> retVal = new ArrayList<VersionIndependentConcept>(codes.size()); ArrayList<VersionIndependentConcept> retVal = new ArrayList<VersionIndependentConcept>(codes.size());
for (TermConcept next : codes) { for (TermConcept next : codes) {
@ -582,7 +584,7 @@ public abstract class BaseHapiTerminologySvc implements IHapiTerminologySvc {
throw new InvalidRequestException("CodeSystem contains circular reference around code " + theConcept.getCode()); throw new InvalidRequestException("CodeSystem contains circular reference around code " + theConcept.getCode());
} }
theConceptsStack.add(theConcept.getCode()); theConceptsStack.add(theConcept.getCode());
int retVal = 0; int retVal = 0;
if (theAllConcepts.put(theConcept, theAllConcepts) == null) { if (theAllConcepts.put(theConcept, theAllConcepts) == null) {
if (theAllConcepts.size() % 1000 == 0) { if (theAllConcepts.size() % 1000 == 0) {