Add type parameter to resource reindexing operation (#1921)

* Add type parameter to resource reindexing operation

* Add changelog

* CLean up reindexing
This commit is contained in:
James Agnew 2020-06-12 16:51:18 -04:00 committed by GitHub
parent 4f45c2c25d
commit 622659cbb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 174 additions and 32 deletions

View File

@ -0,0 +1,5 @@
---
type: add
issue: 1921
title: "A new optional `type` parameter has been added to the `$mark-all-resources-for-reindexing` operation, allowing
resources of a specific type to be marked."

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
import ca.uhn.fhir.jpa.entity.TermCodeSystem;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
@ -62,6 +63,8 @@ public class FhirResourceDaoCodeSystemDstu3 extends BaseHapiFhirResourceDao<Code
@Autowired
protected ITermCodeSystemStorageSvc myTerminologyCodeSystemStorageSvc;
@Autowired
protected ITermDeferredStorageSvc myTermDeferredStorageSvc;
@Autowired
private ITermCodeSystemDao myCsDao;
@Autowired
private IValidationSupport myValidationSupport;
@ -134,7 +137,7 @@ public class FhirResourceDaoCodeSystemDstu3 extends BaseHapiFhirResourceDao<Code
if (isNotBlank(codeSystemUrl)) {
TermCodeSystem persCs = myCsDao.findByCodeSystemUri(codeSystemUrl);
if (persCs != null) {
myTerminologyCodeSystemStorageSvc.deleteCodeSystem(persCs);
myTermDeferredStorageSvc.deleteCodeSystem(persCs);
}
}
}

View File

@ -28,6 +28,7 @@ import ca.uhn.fhir.jpa.dao.BaseHapiFhirResourceDao;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
import ca.uhn.fhir.jpa.entity.TermCodeSystem;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
@ -65,6 +66,8 @@ public class FhirResourceDaoCodeSystemR4 extends BaseHapiFhirResourceDao<CodeSys
protected ITermCodeSystemStorageSvc myTerminologyCodeSystemStorageSvc;
@Autowired
private FhirContext myFhirContext;
@Autowired
protected ITermDeferredStorageSvc myTermDeferredStorageSvc;
@Override
public List<IIdType> findCodeSystemIdsContainingSystemAndCode(String theCode, String theSystem, RequestDetails theRequest) {
@ -132,7 +135,7 @@ public class FhirResourceDaoCodeSystemR4 extends BaseHapiFhirResourceDao<CodeSys
if (isNotBlank(codeSystemUrl)) {
TermCodeSystem persCs = myCsDao.findByCodeSystemUri(codeSystemUrl);
if (persCs != null) {
myTerminologyCodeSystemStorageSvc.deleteCodeSystem(persCs);
myTermDeferredStorageSvc.deleteCodeSystem(persCs);
}
}
}

View File

@ -29,6 +29,7 @@ import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
import ca.uhn.fhir.jpa.dao.index.IdHelperService;
import ca.uhn.fhir.jpa.entity.TermCodeSystem;
import ca.uhn.fhir.jpa.model.cross.IBasePersistedResource;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
@ -67,6 +68,8 @@ public class FhirResourceDaoCodeSystemR5 extends BaseHapiFhirResourceDao<CodeSys
private IValidationSupport myValidationSupport;
@Autowired
private FhirContext myFhirContext;
@Autowired
protected ITermDeferredStorageSvc myTermDeferredStorageSvc;
@Override
public List<IIdType> findCodeSystemIdsContainingSystemAndCode(String theCode, String theSystem, RequestDetails theRequest) {
@ -134,7 +137,7 @@ public class FhirResourceDaoCodeSystemR5 extends BaseHapiFhirResourceDao<CodeSys
if (isNotBlank(codeSystemUrl)) {
TermCodeSystem persCs = myCsDao.findByCodeSystemUri(codeSystemUrl);
if (persCs != null) {
myTerminologyCodeSystemStorageSvc.deleteCodeSystem(persCs);
myTermDeferredStorageSvc.deleteCodeSystem(persCs);
}
}
}

View File

@ -27,14 +27,23 @@ import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public abstract class BaseJpaSystemProviderDstu2Plus<T, MT> extends BaseJpaSystemProvider<T, MT> {
@Operation(name = MARK_ALL_RESOURCES_FOR_REINDEXING, idempotent = true, returnParameters = {
@OperationParam(name = "status")
})
public IBaseResource markAllResourcesForReindexing() {
getResourceReindexingSvc().markAllResourcesForReindexing();
public IBaseResource markAllResourcesForReindexing(
@OperationParam(name="type", min = 0, max = 1, typeName = "code") IPrimitiveType<String> theType
) {
if (theType != null && isNotBlank(theType.getValueAsString())) {
getResourceReindexingSvc().markAllResourcesForReindexing(theType.getValueAsString());
} else {
getResourceReindexingSvc().markAllResourcesForReindexing();
}
IBaseParameters retVal = ParametersUtil.newInstance(getContext());

View File

@ -37,7 +37,9 @@ import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
import ca.uhn.fhir.jpa.searchparam.registry.ISearchParamRegistry;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.annotations.VisibleForTesting;
@ -188,8 +190,14 @@ public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
@Override
@Transactional(Transactional.TxType.REQUIRED)
public Long markAllResourcesForReindexing(String theType) {
String typeDesc;
if (isNotBlank(theType)) {
try {
myContext.getResourceType(theType);
} catch (DataFormatException e) {
throw new InvalidRequestException("Unknown resource type: " + theType);
}
myReindexJobDao.markAllOfTypeAsDeleted(theType);
typeDesc = theType;
} else {

View File

@ -40,7 +40,6 @@ import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermConceptDesignation;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
import ca.uhn.fhir.jpa.entity.TermConceptProperty;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.term.api.ITermCodeSystemStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
@ -49,6 +48,7 @@ import ca.uhn.fhir.jpa.term.api.ITermVersionAdapterSvc;
import ca.uhn.fhir.jpa.term.custom.CustomTerminologySet;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.ObjectUtil;
@ -199,23 +199,30 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
}
@Override
@Transactional(propagation = Propagation.NEVER)
public void deleteCodeSystem(TermCodeSystem theCodeSystem) {
ourLog.info(" * Deleting code system {}", theCodeSystem.getPid());
myEntityManager.flush();
TermCodeSystem cs = myCodeSystemDao.findById(theCodeSystem.getPid()).orElseThrow(IllegalStateException::new);
cs.setCurrentVersion(null);
myCodeSystemDao.save(cs);
myCodeSystemDao.flush();
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
txTemplate.executeWithoutResult(t -> {
myEntityManager.flush();
TermCodeSystem cs = myCodeSystemDao.findById(theCodeSystem.getPid()).orElseThrow(IllegalStateException::new);
cs.setCurrentVersion(null);
myCodeSystemDao.save(cs);
myCodeSystemDao.flush();
});
List<TermCodeSystemVersion> codeSystemVersions = myCodeSystemVersionDao.findByCodeSystemPid(theCodeSystem.getPid());
for (TermCodeSystemVersion next : codeSystemVersions) {
deleteCodeSystemVersion(next.getPid());
}
myCodeSystemVersionDao.deleteForCodeSystem(theCodeSystem);
myCodeSystemDao.delete(theCodeSystem);
myEntityManager.flush();
txTemplate.executeWithoutResult(t -> {
myCodeSystemVersionDao.deleteForCodeSystem(theCodeSystem);
myCodeSystemDao.delete(theCodeSystem);
myEntityManager.flush();
});
}
/**
@ -679,27 +686,30 @@ public class TermCodeSystemStorageSvcImpl implements ITermCodeSystemStorageSvc {
}
@SuppressWarnings("ConstantConditions")
private <T> void doDelete(String theDescriptor, Supplier<Slice<Long>> theLoader, Supplier<Integer> theCounter, IHapiJpaRepository<T> theDao) {
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
int count;
ourLog.info(" * Deleting {}", theDescriptor);
int totalCount = theCounter.get();
int totalCount = txTemplate.execute(t -> theCounter.get());
StopWatch sw = new StopWatch();
count = 0;
while (true) {
Slice<Long> link = theLoader.get();
Slice<Long> link = txTemplate.execute(t -> theLoader.get());
if (!link.hasContent()) {
break;
}
TransactionTemplate txTemplate = new TransactionTemplate(myTransactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
txTemplate.execute(t -> {
link.forEach(id -> theDao.deleteByPid(id));
return null;
});
count += link.getNumberOfElements();
ourLog.info(" * {} {} deleted - {}/sec - ETA: {}", count, theDescriptor, sw.formatThroughput(count, TimeUnit.SECONDS), sw.getEstimatedTimeRemaining(count, totalCount));
ourLog.info(" * {} {} deleted ({}/{}) remaining - {}/sec - ETA: {}", count, theDescriptor, count, totalCount, sw.formatThroughput(count, TimeUnit.SECONDS), sw.getEstimatedTimeRemaining(count, totalCount));
}
theDao.flush();
}

View File

@ -21,8 +21,10 @@ package ca.uhn.fhir.jpa.term;
*/
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptParentChildLinkDao;
import ca.uhn.fhir.jpa.entity.TermCodeSystem;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
@ -50,6 +52,7 @@ import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
@ -57,8 +60,11 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
@Autowired
protected ITermConceptDao myConceptDao;
@Autowired
protected ITermCodeSystemDao myCodeSystemDao;
@Autowired
protected PlatformTransactionManager myTransactionMgr;
private boolean myProcessDeferred = true;
private List<TermCodeSystem> myCodeSystemsToDelete = Collections.synchronizedList(new ArrayList<>());
private List<TermConcept> myDeferredConcepts = Collections.synchronizedList(new ArrayList<>());
private List<ValueSet> myDeferredValueSets = Collections.synchronizedList(new ArrayList<>());
private List<ConceptMap> myDeferredConceptMaps = Collections.synchronizedList(new ArrayList<>());
@ -98,6 +104,14 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
myDeferredValueSets.addAll(theValueSets);
}
@Override
@Transactional
public void deleteCodeSystem(TermCodeSystem theCodeSystem) {
theCodeSystem.setCodeSystemUri("urn:uuid:" + UUID.randomUUID().toString());
myCodeSystemDao.save(theCodeSystem);
myCodeSystemsToDelete.add(theCodeSystem);
}
@Override
public void saveAllDeferred() {
while (!isStorageQueueEmpty()) {
@ -194,7 +208,8 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
if (!isDeferredConcepts() &&
!isConceptLinksToSaveLater() &&
!isDeferredValueSets() &&
!isDeferredConceptMaps()) {
!isDeferredConceptMaps() &&
!isDeferredCodeSystemDeletions()) {
return;
}
@ -213,6 +228,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
return null;
});
}
if (isDeferredConceptMaps()) {
tt.execute(t -> {
processDeferredConceptMaps();
@ -220,9 +236,19 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
});
}
if (isDeferredCodeSystemDeletions()) {
processDeferredCodeSystemDeletions();
}
}
}
private void processDeferredCodeSystemDeletions() {
for (TermCodeSystem next : myCodeSystemsToDelete) {
myCodeSystemStorageSvc.deleteCodeSystem(next);
}
myCodeSystemsToDelete.clear();
}
@Override
public boolean isStorageQueueEmpty() {
boolean retVal = true;
@ -231,6 +257,7 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
retVal &= !isConceptLinksToSaveLater();
retVal &= !isDeferredValueSets();
retVal &= !isDeferredConceptMaps();
retVal &= !isDeferredCodeSystemDeletions();
return retVal;
}
@ -249,6 +276,10 @@ public class TermDeferredStorageSvcImpl implements ITermDeferredStorageSvc {
return isDeferredConcepts() || isConceptLinksToSaveLater();
}
private boolean isDeferredCodeSystemDeletions() {
return !myCodeSystemsToDelete.isEmpty();
}
private boolean isDeferredConcepts() {
return !myDeferredConcepts.isEmpty();
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.term.api;
* #L%
*/
import ca.uhn.fhir.jpa.entity.TermCodeSystem;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermConceptParentChildLink;
import org.hl7.fhir.r4.model.ConceptMap;
@ -51,8 +52,11 @@ public interface ITermDeferredStorageSvc {
void addValueSetsToStorageQueue(List<ValueSet> theValueSets);
void deleteCodeSystem(TermCodeSystem theCodeSystem);
/**
* This is mostly here for unit tests - Saves any and all deferred concepts and links
*/
void saveAllDeferred();
}

View File

@ -3,27 +3,24 @@ package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.jpa.term.TermReindexingSvcImpl;
import ca.uhn.fhir.util.TestUtil;
import org.apache.commons.io.IOUtils;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.CodeSystem;
import org.junit.AfterClass;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class FhirResourceDaoR4CodeSystemTest extends BaseJpaR4Test {
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
TermReindexingSvcImpl.setForceSaveDeferredAlwaysForUnitTest(false);
}
@Test
public void testIndexContained() throws Exception {
TermReindexingSvcImpl.setForceSaveDeferredAlwaysForUnitTest(true);
String input = IOUtils.toString(getClass().getResource("/r4/codesystem_complete.json"), StandardCharsets.UTF_8);
CodeSystem cs = myFhirCtx.newJsonParser().parseResource(CodeSystem.class, input);
myCodeSystemDao.create(cs, mySrd);
@ -31,10 +28,56 @@ public class FhirResourceDaoR4CodeSystemTest extends BaseJpaR4Test {
myResourceReindexingSvc.markAllResourcesForReindexing();
int outcome = myResourceReindexingSvc.forceReindexingPass();
assertNotEquals(-1, outcome); // -1 means there was a failure
myTerminologyDeferredStorageSvc.saveDeferred();
}
@Test
public void testDeleteLargeCompleteCodeSystem() {
CodeSystem cs = new CodeSystem();
cs.setContent(CodeSystem.CodeSystemContentMode.COMPLETE);
cs.setUrl("http://foo");
for (int i = 0; i < 222; i++) {
cs.addConcept().setCode("CODE" + i);
}
IIdType id = myCodeSystemDao.create(cs).getId().toUnqualifiedVersionless();
myTerminologyDeferredStorageSvc.saveDeferred();
myTerminologyDeferredStorageSvc.saveDeferred();
runInTransaction(() -> {
assertEquals(1, myTermCodeSystemDao.count());
assertNotNull(myTermCodeSystemDao.findByCodeSystemUri("http://foo"));
assertEquals(1, myTermCodeSystemVersionDao.count());
assertEquals(222, myTermConceptDao.count());
});
myCodeSystemDao.delete(id);
// Nothing is deleted initially but the URI is changed so it can't be found
runInTransaction(() -> {
assertEquals(1, myTermCodeSystemDao.count());
assertNull(myTermCodeSystemDao.findByCodeSystemUri("http://foo"));
assertEquals(1, myTermCodeSystemVersionDao.count());
assertEquals(222, myTermConceptDao.count());
});
// Now the background scheduler will do its thing
myTerminologyDeferredStorageSvc.saveDeferred();
runInTransaction(() -> {
assertEquals(0, myTermCodeSystemDao.count());
assertEquals(0, myTermCodeSystemVersionDao.count());
assertEquals(0, myTermConceptDao.count());
});
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
TermReindexingSvcImpl.setForceSaveDeferredAlwaysForUnitTest(false);
}
}

View File

@ -233,7 +233,6 @@ public class SystemProviderR4Test extends BaseJpaR4Test {
assertEquals(200, http.getStatusLine().getStatusCode());
} finally {
IOUtils.closeQuietly(http);
;
}
get = new HttpGet(ourServerBase + "/$perform-reindexing-pass");
@ -249,6 +248,30 @@ public class SystemProviderR4Test extends BaseJpaR4Test {
}
@Test
public void testMarkResourcesForReindexingTyped() throws Exception {
HttpGet get = new HttpGet(ourServerBase + "/$mark-all-resources-for-reindexing?type=Patient");
CloseableHttpResponse http = ourHttpClient.execute(get);
try {
String output = IOUtils.toString(http.getEntity().getContent(), StandardCharsets.UTF_8);
ourLog.info(output);
assertEquals(200, http.getStatusLine().getStatusCode());
} finally {
IOUtils.closeQuietly(http);
}
get = new HttpGet(ourServerBase + "/$mark-all-resources-for-reindexing?type=FOO");
http = ourHttpClient.execute(get);
try {
String output = IOUtils.toString(http.getEntity().getContent(), StandardCharsets.UTF_8);
ourLog.info(output);
assertEquals(400, http.getStatusLine().getStatusCode());
} finally {
IOUtils.closeQuietly(http);
}
}
@SuppressWarnings("deprecation")
@Test
public void testResponseUsesCorrectContentType() throws Exception {