Defer large term delta adds (#1736)
* Defer large term delta adds * Add changelog
This commit is contained in:
parent
5bc554949b
commit
8c78e465b1
|
@ -0,0 +1,7 @@
|
|||
---
|
||||
type: add
|
||||
issue: 1736
|
||||
title: When performing large terminology concept additions via the delta addition service, concepts will
|
||||
now be added via the deferred storage service, meaning that they will be added in small incremental batches
|
||||
instead of as a part of one large transaction. This helps to avoid timeouts and memory issues when uploading
|
||||
large collections of concepts.
|
|
@ -45,7 +45,6 @@ import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
|
|||
import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
|
||||
import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc;
|
||||
import ca.uhn.fhir.jpa.term.custom.CustomTerminologySet;
|
||||
import ca.uhn.fhir.jpa.util.ScrollableResultsIterator;
|
||||
import ca.uhn.fhir.rest.api.Constants;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
|
||||
|
@ -53,11 +52,7 @@ import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
|
|||
import ca.uhn.fhir.util.ObjectUtil;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import ca.uhn.fhir.util.ValidateUtil;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.hibernate.ScrollMode;
|
||||
import org.hibernate.ScrollableResults;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.CodeSystem;
|
||||
import org.hl7.fhir.r4.model.ConceptMap;
|
||||
|
@ -78,11 +73,6 @@ import javax.annotation.Nonnull;
|
|||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.PersistenceContext;
|
||||
import javax.persistence.PersistenceContextType;
|
||||
import javax.persistence.TypedQuery;
|
||||
import javax.persistence.criteria.CriteriaBuilder;
|
||||
import javax.persistence.criteria.CriteriaQuery;
|
||||
import javax.persistence.criteria.Predicate;
|
||||
import javax.persistence.criteria.Root;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -165,11 +155,12 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
|
|||
IIdType codeSystemId = cs.getResource().getIdDt();
|
||||
|
||||
UploadStatistics retVal = new UploadStatistics(codeSystemId);
|
||||
HashMap<String, TermConcept> codeToConcept = new HashMap<>();
|
||||
|
||||
// Add root concepts
|
||||
for (TermConcept nextRootConcept : theAdditions.getRootConcepts()) {
|
||||
List<String> parentCodes = Collections.emptyList();
|
||||
addConcept(csv, parentCodes, nextRootConcept, retVal, true, 0);
|
||||
addConceptInHierarchy(csv, parentCodes, nextRootConcept, retVal, codeToConcept, 0);
|
||||
}
|
||||
|
||||
return retVal;
|
||||
|
@ -445,7 +436,7 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
|
|||
Validate.isTrue(myContext.getVersion().getVersion().isEqualOrNewerThan(FhirVersionEnum.DSTU3), "Terminology operations only supported in DSTU3+ mode");
|
||||
}
|
||||
|
||||
private void addConcept(TermCodeSystemVersion theCsv, Collection<String> theParentCodes, TermConcept theConceptToAdd, UploadStatistics theStatisticsTracker, boolean theRootConcept, int theSequence) {
|
||||
private void addConceptInHierarchy(TermCodeSystemVersion theCsv, Collection<String> theParentCodes, TermConcept theConceptToAdd, UploadStatistics theStatisticsTracker, Map<String, TermConcept> theCodeToConcept, int theSequence) {
|
||||
TermConcept conceptToAdd = theConceptToAdd;
|
||||
List<TermConceptParentChildLink> childrenToAdd = theConceptToAdd.getChildren();
|
||||
|
||||
|
@ -470,15 +461,18 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
|
|||
for (String nextParentCode : theParentCodes) {
|
||||
|
||||
// Don't add parent links that already exist for the code
|
||||
if (existingParentLinks.stream().anyMatch(t->t.getParent().getCode().equals(nextParentCode))) {
|
||||
if (existingParentLinks.stream().anyMatch(t -> t.getParent().getCode().equals(nextParentCode))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Optional<TermConcept> nextParentOpt = myConceptDao.findByCodeSystemAndCode(theCsv, nextParentCode);
|
||||
if (nextParentOpt.isPresent() == false) {
|
||||
TermConcept nextParentOpt = theCodeToConcept.get(nextParentCode);
|
||||
if (nextParentOpt == null) {
|
||||
nextParentOpt = myConceptDao.findByCodeSystemAndCode(theCsv, nextParentCode).orElse(null);
|
||||
}
|
||||
if (nextParentOpt == null) {
|
||||
throw new InvalidRequestException("Unable to add code \"" + nextCodeToAdd + "\" to unknown parent: " + nextParentCode);
|
||||
}
|
||||
parentConceptsWeShouldLinkTo.add(nextParentOpt.get());
|
||||
parentConceptsWeShouldLinkTo.add(nextParentOpt);
|
||||
}
|
||||
|
||||
if (conceptToAdd.getSequence() == null) {
|
||||
|
@ -489,10 +483,17 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
|
|||
// force a reindex, and it'll be regenerated then
|
||||
conceptToAdd.setParentPids(null);
|
||||
conceptToAdd.setCodeSystemVersion(theCsv);
|
||||
conceptToAdd = myConceptDao.save(conceptToAdd);
|
||||
|
||||
Long nextConceptPid = conceptToAdd.getId();
|
||||
Validate.notNull(nextConceptPid);
|
||||
if (theStatisticsTracker.getUpdatedConceptCount() <= myDaoConfig.getDeferIndexingForCodesystemsOfSize()) {
|
||||
conceptToAdd = myConceptDao.save(conceptToAdd);
|
||||
Long nextConceptPid = conceptToAdd.getId();
|
||||
Validate.notNull(nextConceptPid);
|
||||
} else {
|
||||
myDeferredStorageSvc.addConceptToStorageQueue(conceptToAdd);
|
||||
}
|
||||
|
||||
theCodeToConcept.put(conceptToAdd.getCode(), conceptToAdd);
|
||||
|
||||
theStatisticsTracker.incrementUpdatedConceptCount();
|
||||
|
||||
// Add link to new child to the parent
|
||||
|
@ -505,7 +506,13 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
|
|||
nextParentConcept.getChildren().add(parentLink);
|
||||
conceptToAdd.getParents().add(parentLink);
|
||||
ourLog.info("Saving parent/child link - Parent[{}] Child[{}]", parentLink.getParent().getCode(), parentLink.getChild().getCode());
|
||||
|
||||
if (theStatisticsTracker.getUpdatedConceptCount() <= myDaoConfig.getDeferIndexingForCodesystemsOfSize()) {
|
||||
myConceptParentChildLinkDao.save(parentLink);
|
||||
} else {
|
||||
myDeferredStorageSvc.addConceptLinkToStorageQueue(parentLink);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ourLog.trace("About to save parent-child links");
|
||||
|
@ -519,13 +526,20 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
|
|||
for (int i = 0; i < nextChild.getParents().size(); i++) {
|
||||
if (nextChild.getParents().get(i).getId() == null) {
|
||||
String parentCode = nextChild.getParents().get(i).getParent().getCode();
|
||||
TermConcept parentConcept = myConceptDao.findByCodeSystemAndCode(theCsv, parentCode).orElseThrow(() -> new IllegalArgumentException("Unknown parent code: " + parentCode));
|
||||
TermConcept parentConcept = theCodeToConcept.get(parentCode);
|
||||
if (parentConcept == null) {
|
||||
parentConcept = myConceptDao.findByCodeSystemAndCode(theCsv, parentCode).orElse(null);
|
||||
}
|
||||
if (parentConcept == null) {
|
||||
throw new IllegalArgumentException("Unknown parent code: " + parentCode);
|
||||
}
|
||||
|
||||
nextChild.getParents().get(i).setParent(parentConcept);
|
||||
}
|
||||
}
|
||||
|
||||
Collection<String> parentCodes = nextChild.getParents().stream().map(t -> t.getParent().getCode()).collect(Collectors.toList());
|
||||
addConcept(theCsv, parentCodes, nextChild, theStatisticsTracker, false, childIndex);
|
||||
addConceptInHierarchy(theCsv, parentCodes, nextChild, theStatisticsTracker, theCodeToConcept, childIndex);
|
||||
|
||||
childIndex++;
|
||||
}
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package ca.uhn.fhir.jpa.term;
|
||||
|
||||
import ca.uhn.fhir.context.support.IContextValidationSupport;
|
||||
import ca.uhn.fhir.jpa.dao.DaoConfig;
|
||||
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
|
||||
import ca.uhn.fhir.jpa.entity.TermConcept;
|
||||
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
|
||||
import ca.uhn.fhir.jpa.term.custom.CustomTerminologySet;
|
||||
import ca.uhn.fhir.rest.api.server.IBundleProvider;
|
||||
import ca.uhn.fhir.rest.param.UriParam;
|
||||
|
@ -16,12 +18,15 @@ import org.hl7.fhir.r4.model.Coding;
|
|||
import org.hl7.fhir.r4.model.Parameters;
|
||||
import org.hl7.fhir.r4.model.StringType;
|
||||
import org.hl7.fhir.r4.model.ValueSet;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -29,12 +34,18 @@ import java.util.stream.Collectors;
|
|||
import static org.apache.commons.lang3.StringUtils.leftPad;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TerminologySvcDeltaR4Test extends BaseJpaR4Test {
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(TerminologySvcDeltaR4Test.class);
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
myDaoConfig.setDeferIndexingForCodesystemsOfSize(new DaoConfig().getDeferIndexingForCodesystemsOfSize());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAddRootConcepts() {
|
||||
|
@ -349,6 +360,49 @@ public class TerminologySvcDeltaR4Test extends BaseJpaR4Test {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAddLargeHierarchy() {
|
||||
myDaoConfig.setDeferIndexingForCodesystemsOfSize(5);
|
||||
|
||||
createNotPresentCodeSystem();
|
||||
ValueSet vs;
|
||||
vs = expandNotPresentCodeSystem();
|
||||
assertEquals(0, vs.getExpansion().getContains().size());
|
||||
|
||||
CustomTerminologySet delta = new CustomTerminologySet();
|
||||
|
||||
// Create a nice deep hierarchy
|
||||
TermConcept concept = delta.addRootConcept("Root", "Root");
|
||||
int nestedDepth = 10;
|
||||
for (int i = 0; i < nestedDepth; i++) {
|
||||
String name = concept.getCode();
|
||||
concept = concept.addChild(TermConceptParentChildLink.RelationshipTypeEnum.ISA).setCode(name + "0").setDisplay(name + "0");
|
||||
}
|
||||
|
||||
myTermCodeSystemStorageSvc.applyDeltaCodeSystemsAdd("http://foo/cs", delta);
|
||||
|
||||
assertFalse(myTermDeferredStorageSvc.isStorageQueueEmpty());
|
||||
while (!myTermDeferredStorageSvc.isStorageQueueEmpty()) {
|
||||
myTermDeferredStorageSvc.saveDeferred();
|
||||
}
|
||||
|
||||
List<String> expectedHierarchy = new ArrayList<>();
|
||||
for (int i = 0; i < nestedDepth + 1; i++) {
|
||||
String expected = leftPad("", i, " ") +
|
||||
"Root" +
|
||||
leftPad("", i, "0") +
|
||||
" seq=0";
|
||||
expectedHierarchy.add(expected);
|
||||
}
|
||||
|
||||
assertHierarchyContains(expectedHierarchy.toArray(new String[0]));
|
||||
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private ITermDeferredStorageSvc myTermDeferredStorageSvc;
|
||||
|
||||
|
||||
@Test
|
||||
public void testAddModifiesExistingCodesInPlace() {
|
||||
|
||||
|
|
Loading…
Reference in New Issue