Issue 3450 there is no way to recreate freetext indexes for terminology entities (#3463)

* Add reindex-terminology batch command

* Handle number of thread throttling and concurrency with other terminology batch operations

* Add required dbcp2 dependency

* Fix test

* Improve ConnectionPoolInfoProvider setup. Handle maximum connections.

* Remove java higher version construction

* Remove unused config

* Add reindex terminology integration test.
Reset termConcept counters before pre-expanding, which otherwise accumulate if it was pre-expanded before.

Co-authored-by: juan.marchionatto <juan.marchionatto@smilecdr.com>
This commit is contained in:
jmarchionatto 2022-03-18 08:39:12 -04:00 committed by GitHub
parent 17cf1ed9ce
commit e45f7ee9cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1342 additions and 20 deletions

View File

@ -25,7 +25,7 @@ public final class Msg {
/**
* IMPORTANT: Please update the following comment after you add a new code
* Last code value: 2071
* Last code value: 2072
*/
private Msg() {}

View File

@ -259,8 +259,13 @@
<artifactId>jarchivelib</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</dependencies>
<build>
<plugins>

View File

@ -188,6 +188,7 @@ public abstract class BaseApp {
commands.add(new HapiFlywayMigrateDatabaseCommand());
commands.add(new CreatePackageCommand());
commands.add(new BulkImportCommand());
commands.add(new ReindexTerminologyCommand());
return commands;
}

View File

@ -0,0 +1,111 @@
package ca.uhn.fhir.cli;
/*-
* #%L
* HAPI FHIR - Command Line Client - API
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.util.ParametersUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.r4.model.Parameters;
import java.util.Optional;
import static ca.uhn.fhir.jpa.provider.BaseJpaSystemProvider.RESP_PARAM_SUCCESS;
public class ReindexTerminologyCommand extends BaseRequestGeneratingCommand {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ReindexTerminologyCommand.class);
static final String REINDEX_TERMINOLOGY = "reindex-terminology";
@Override
public String getCommandDescription() {
return "Recreates freetext-indexes for terminology data.";
}
@Override
public String getCommandName() {
return REINDEX_TERMINOLOGY;
}
@Override
public void run(CommandLine theCommandLine) throws ParseException {
parseFhirContext(theCommandLine);
IGenericClient client = newClient(theCommandLine);
if (theCommandLine.hasOption(VERBOSE_LOGGING_PARAM)) {
client.registerInterceptor(new LoggingInterceptor(true));
}
invokeOperation(client);
}
private void invokeOperation(IGenericClient theClient) {
IBaseParameters inputParameters = ParametersUtil.newInstance(myFhirCtx);
ourLog.info("Beginning freetext indexing - This may take a while...");
IBaseParameters response;
try {
response = theClient
.operation()
.onServer()
.named(REINDEX_TERMINOLOGY)
.withNoParameters(Parameters.class)
.execute();
} catch (BaseServerResponseException e) {
if (e.getOperationOutcome() != null) {
ourLog.error("Received the following response: {}{}", NL,
myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(e.getOperationOutcome()));
}
throw e;
}
Optional<String> isSuccessResponse = ParametersUtil.getNamedParameterValueAsString(myFhirCtx, response, RESP_PARAM_SUCCESS);
if ( ! isSuccessResponse.isPresent() ) {
ParametersUtil.addParameterToParametersBoolean(myFhirCtx, response, RESP_PARAM_SUCCESS, false);
ParametersUtil.addParameterToParametersString(myFhirCtx, response, "message",
"Internal error. Command result unknown. Check system logs for details");
ourLog.info("Response:{}{}", NL, myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(response));
return;
}
boolean succeeded = Boolean.parseBoolean( isSuccessResponse.get() );
if ( ! succeeded) {
ourLog.info("Response:{}{}", NL, myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(response));
return;
}
ourLog.info("Recreation of terminology freetext indexes complete!");
ourLog.info("Response:{}{}", NL, myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(response));
}
public static final String NL = System.getProperty("line.separator");
}

View File

@ -0,0 +1,135 @@
package ca.uhn.fhir.cli;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.provider.BaseJpaSystemProvider;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import ca.uhn.fhir.util.ParametersUtil;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import static ca.uhn.fhir.jpa.provider.BaseJpaSystemProvider.RESP_PARAM_SUCCESS;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@ExtendWith(MockitoExtension.class)
class ReindexTerminologyCommandTest {
private final FhirContext myContext = FhirContext.forR4();
@Spy
private BaseJpaSystemProvider<?, ?> myProvider = spy(new BaseJpaSystemProvider<>());
@RegisterExtension
public final RestfulServerExtension myRestfulServerExtension =
new RestfulServerExtension(myContext, myProvider);
private final PrintStream standardOut = System.out;
private final ByteArrayOutputStream outputStreamCaptor = new ByteArrayOutputStream();
static {
System.setProperty("test", "true");
}
@Test
public void testProviderMethodInvoked() {
System.setOut(new PrintStream(outputStreamCaptor));
IBaseParameters retVal = ParametersUtil.newInstance(myContext);
ParametersUtil.addParameterToParametersBoolean(myContext, retVal, RESP_PARAM_SUCCESS, true);
doReturn(retVal).when(myProvider).reindexTerminology(any());
App.main(new String[]{
ReindexTerminologyCommand.REINDEX_TERMINOLOGY,
"-v", "r4",
"-t", myRestfulServerExtension.getBaseUrl()
});
assertThat(outputStreamCaptor.toString().trim(),
outputStreamCaptor.toString().trim(), containsString("<valueBoolean value=\"true\"/>"));
}
@Test
public void testNoVersionThrows() {
IBaseParameters retVal = ParametersUtil.newInstance(myContext);
ParametersUtil.addParameterToParametersBoolean(myContext, retVal, RESP_PARAM_SUCCESS, true);
doReturn(retVal).when(myProvider).reindexTerminology(any());
Error thrown = assertThrows(Error.class, () ->
App.main(new String[]{
ReindexTerminologyCommand.REINDEX_TERMINOLOGY,
"-t", myRestfulServerExtension.getBaseUrl()
})
);
assertThat(thrown.getMessage(), containsString("Missing required option: v"));
}
@Test
public void testNoTargetThrows() {
IBaseParameters retVal = ParametersUtil.newInstance(myContext);
ParametersUtil.addParameterToParametersBoolean(myContext, retVal, RESP_PARAM_SUCCESS, true);
doReturn(retVal).when(myProvider).reindexTerminology(any());
Error thrown = assertThrows(Error.class, () ->
App.main(new String[]{ReindexTerminologyCommand.REINDEX_TERMINOLOGY, "-v", "r4"})
);
assertThat(thrown.getMessage(), containsString("Missing required option: t"));
}
@Test
public void testHandleUnexpectedResponse() {
System.setOut(new PrintStream(outputStreamCaptor));
IBaseParameters retVal = ParametersUtil.newInstance(myContext);
doReturn(retVal).when(myProvider).reindexTerminology(any());
App.main(new String[]{
ReindexTerminologyCommand.REINDEX_TERMINOLOGY,
"-v", "r4",
"-t", myRestfulServerExtension.getBaseUrl()
});
assertThat(outputStreamCaptor.toString().trim(),
outputStreamCaptor.toString().trim(), containsString("<valueBoolean value=\"false\"/>"));
assertThat(outputStreamCaptor.toString().trim(),
outputStreamCaptor.toString().trim(), containsString("<valueString value=\"Internal error. " +
"Command result unknown. Check system logs for details\"/>"));
}
@Test
public void testHandleServiceError() {
System.setOut(new PrintStream(outputStreamCaptor));
IBaseParameters retVal = ParametersUtil.newInstance(myContext);
ParametersUtil.addParameterToParametersBoolean(myContext, retVal, RESP_PARAM_SUCCESS, false);
ParametersUtil.addParameterToParametersString(myContext, retVal, "message",
"Freetext service is not configured. Operation didn't run.");
doReturn(retVal).when(myProvider).reindexTerminology(any());
App.main(new String[]{
ReindexTerminologyCommand.REINDEX_TERMINOLOGY,
"-v", "r4",
"-t", myRestfulServerExtension.getBaseUrl()
});
assertThat(outputStreamCaptor.toString().trim(),
outputStreamCaptor.toString().trim(), containsString("<valueBoolean value=\"false\"/>"));
assertThat(outputStreamCaptor.toString().trim(),
outputStreamCaptor.toString().trim(), containsString("<valueString value=\"Freetext service is not configured. Operation didn't run.\"/>"));
}
}

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 3450
title: "There was no way to recreate freetext indexes for terminology entities. Command line operation
reindex-terminology was added for this purpose."

View File

@ -142,6 +142,18 @@
<artifactId>hapi-fhir-storage-batch2-jobs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-storage</artifactId>
@ -272,17 +284,6 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-test-utilities</artifactId>

View File

@ -29,6 +29,8 @@ import org.hibernate.search.engine.cfg.BackendSettings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import javax.sql.DataSource;
public class HibernatePropertiesProvider {
@Autowired
@ -61,4 +63,9 @@ public class HibernatePropertiesProvider {
}
return myHibernateSearchBackend;
}
public DataSource getDataSource() {
return myEntityManagerFactory.getDataSource();
}
}

View File

@ -0,0 +1,33 @@
package ca.uhn.fhir.jpa.config.util;
import org.apache.commons.dbcp2.BasicDataSource;
import java.util.Optional;
/**
* Utility class to hide the complexities of obtaining ConnectionPool information
*/
public class BasicDataSourceConnectionPoolInfoProvider implements IConnectionPoolInfoProvider {
public final BasicDataSource myDataSource;
public BasicDataSourceConnectionPoolInfoProvider(BasicDataSource theDataSource) {
myDataSource = theDataSource;
}
@Override
public Optional<Integer> getTotalConnectionSize() {
return Optional.of( myDataSource.getMaxTotal() );
}
@Override
public Optional<Integer> getActiveConnections() {
return Optional.of( myDataSource.getNumActive() );
}
@Override
public Optional<Long> getMaxWaitMillis() {
return Optional.of( myDataSource.getMaxWaitMillis() );
}
}

View File

@ -0,0 +1,57 @@
package ca.uhn.fhir.jpa.config.util;
import net.ttddyy.dsproxy.support.ProxyDataSource;
import org.apache.commons.dbcp2.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Optional;
/**
* Utility to hide complexity involved in obtaining connection pool information
*/
public class ConnectionPoolInfoProvider implements IConnectionPoolInfoProvider {
private static final Logger ourLog = LoggerFactory.getLogger(ConnectionPoolInfoProvider.class);
private IConnectionPoolInfoProvider provider;
public ConnectionPoolInfoProvider(DataSource theDataSource) {
if (theDataSource.getClass().isAssignableFrom(BasicDataSource.class)) {
provider = new BasicDataSourceConnectionPoolInfoProvider((BasicDataSource) theDataSource);
return;
}
if ( theDataSource.getClass().isAssignableFrom(ProxyDataSource.class)) {
boolean basiDataSourceWrapped = false;
try {
basiDataSourceWrapped = theDataSource.isWrapperFor(BasicDataSource.class);
if (basiDataSourceWrapped) {
BasicDataSource basicDataSource = theDataSource.unwrap(BasicDataSource.class);
provider = new BasicDataSourceConnectionPoolInfoProvider(basicDataSource);
}
} catch (SQLException ignored) { }
}
}
@Override
public Optional<Integer> getTotalConnectionSize() {
return provider == null ? Optional.empty() : provider.getTotalConnectionSize();
}
@Override
public Optional<Integer> getActiveConnections() {
return provider == null ? Optional.empty() : provider.getActiveConnections();
}
@Override
public Optional<Long> getMaxWaitMillis() {
return provider == null ? Optional.empty() : provider.getMaxWaitMillis();
}
}

View File

@ -0,0 +1,12 @@
package ca.uhn.fhir.jpa.config.util;
import java.util.Optional;
public interface IConnectionPoolInfoProvider {
Optional<Integer> getTotalConnectionSize();
Optional<Integer> getActiveConnections();
Optional<Long> getMaxWaitMillis();
}

View File

@ -20,11 +20,14 @@ package ca.uhn.fhir.jpa.provider;
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.model.ExpungeOutcome;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
import ca.uhn.fhir.jpa.term.api.ReindexTerminologyResult;
import ca.uhn.fhir.rest.annotation.At;
import ca.uhn.fhir.rest.annotation.History;
import ca.uhn.fhir.rest.annotation.Offset;
@ -34,9 +37,14 @@ import ca.uhn.fhir.rest.annotation.Since;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.util.ParametersUtil;
import ca.uhn.fhir.util.StopWatch;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
@ -44,6 +52,9 @@ import javax.servlet.http.HttpServletRequest;
import java.util.Date;
public class BaseJpaSystemProvider<T, MT> extends BaseJpaProvider implements IJpaSystemProvider {
private static final Logger ourLog = LoggerFactory.getLogger(BaseJpaSystemProvider.class);
public static final String RESP_PARAM_SUCCESS = "success";
/**
* @see ProviderConstants#OPERATION_REINDEX
@ -62,6 +73,10 @@ public class BaseJpaSystemProvider<T, MT> extends BaseJpaProvider implements IJp
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
@Autowired
private ITermReadSvc myTermReadSvc;
public BaseJpaSystemProvider() {
// nothing
}
@ -116,4 +131,36 @@ public class BaseJpaSystemProvider<T, MT> extends BaseJpaProvider implements IJp
}
}
@Operation(name = ProviderConstants.OPERATION_REINDEX_TERMINOLOGY, idempotent = false)
public IBaseParameters reindexTerminology(RequestDetails theRequestDetails) {
ReindexTerminologyResult result;
StopWatch sw = new StopWatch();
try {
result = myTermReadSvc.reindexTerminology();
} catch (Exception theE) {
throw new InternalErrorException(Msg.code(2072) +
"Re-creating terminology freetext indexes failed with exception: " + theE.getMessage());
}
IBaseParameters retVal = ParametersUtil.newInstance(getContext());
if ( ! result.equals(ReindexTerminologyResult.SUCCESS) ) {
ParametersUtil.addParameterToParametersBoolean(getContext(), retVal, RESP_PARAM_SUCCESS, false);
String msg = result.equals(ReindexTerminologyResult.SEARCH_SVC_DISABLED)
? "Freetext service is not configured. Operation didn't run."
: "Operation was cancelled because other terminology background tasks are currently running. Try again in a few minutes.";
ParametersUtil.addParameterToParametersString(getContext(), retVal, "message", msg);
return retVal;
}
ParametersUtil.addParameterToParametersBoolean(getContext(), retVal, RESP_PARAM_SUCCESS, true);
ourLog.info("Re-creating terminology freetext indexes took {}", sw);
return retVal;
}
}

View File

@ -34,6 +34,8 @@ import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDaoCodeSystem;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.config.HibernatePropertiesProvider;
import ca.uhn.fhir.jpa.config.util.ConnectionPoolInfoProvider;
import ca.uhn.fhir.jpa.config.util.IConnectionPoolInfoProvider;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemDao;
import ca.uhn.fhir.jpa.dao.data.ITermCodeSystemVersionDao;
@ -67,6 +69,7 @@ import ca.uhn.fhir.jpa.search.ElasticsearchNestedQueryBuilderUtil;
import ca.uhn.fhir.jpa.search.builder.SearchBuilder;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
import ca.uhn.fhir.jpa.term.api.ReindexTerminologyResult;
import ca.uhn.fhir.jpa.term.ex.ExpansionTooCostlyException;
import ca.uhn.fhir.jpa.util.LogicUtil;
import ca.uhn.fhir.rest.api.Constants;
@ -98,6 +101,7 @@ import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.RegexpQuery;
import org.apache.lucene.search.TermQuery;
import org.hibernate.CacheMode;
import org.hibernate.search.backend.elasticsearch.ElasticsearchExtension;
import org.hibernate.search.backend.lucene.LuceneExtension;
import org.hibernate.search.engine.search.predicate.dsl.BooleanPredicateClausesStep;
@ -106,6 +110,7 @@ import org.hibernate.search.engine.search.predicate.dsl.SearchPredicateFactory;
import org.hibernate.search.engine.search.query.SearchQuery;
import org.hibernate.search.mapper.orm.Search;
import org.hibernate.search.mapper.orm.common.EntityReference;
import org.hibernate.search.mapper.orm.massindexing.impl.LoggingMassIndexingMonitor;
import org.hibernate.search.mapper.orm.session.SearchSession;
import org.hl7.fhir.common.hapi.validation.support.CommonCodeSystemsTerminologyService;
import org.hl7.fhir.common.hapi.validation.support.InMemoryTerminologyServerValidationSupport;
@ -135,6 +140,7 @@ import org.hl7.fhir.r4.model.codesystems.ConceptSubsumptionOutcome;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
@ -215,6 +221,12 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
private static final String IDX_PROP_VALUE_STRING = IDX_PROPERTIES + ".myValueString";
private static final String IDX_PROP_DISPLAY_STRING = IDX_PROPERTIES + ".myDisplayString";
public static final int DEFAULT_MASS_INDEXER_OBJECT_LOADING_THREADS = 2;
// doesn't seem to be much gain by using more threads than this value
public static final int MAX_MASS_INDEXER_OBJECT_LOADING_THREADS = 6;
private boolean myPreExpandingValueSets = false;
private final Cache<String, TermCodeSystemVersion> myCodeSystemCurrentVersionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
@Autowired
protected DaoRegistry myDaoRegistry;
@ -270,6 +282,7 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
@Autowired
private HibernatePropertiesProvider myHibernatePropertiesProvider;
private boolean isFullTextSetToUseElastic() {
return "elasticsearch".equalsIgnoreCase(myHibernatePropertiesProvider.getHibernateSearchBackend());
}
@ -1945,6 +1958,8 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
}
TermValueSet termValueSet = optionalTermValueSet.get();
termValueSet.setTotalConcepts(0L);
termValueSet.setTotalConceptDesignations(0L);
termValueSet.setExpansionStatus(TermValueSetPreExpansionStatusEnum.EXPANSION_IN_PROGRESS);
return myTermValueSetDao.saveAndFlush(termValueSet);
});
@ -1953,6 +1968,7 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
}
// We have a ValueSet to pre-expand.
setPreExpandingValueSets(true);
try {
ValueSet valueSet = txTemplate.execute(t -> {
TermValueSet refreshedValueSetToExpand = myTermValueSetDao.findById(valueSetToExpand.getId()).orElseThrow(() -> new IllegalStateException("Unknown VS ID: " + valueSetToExpand.getId()));
@ -1982,10 +1998,21 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
myTermValueSetDao.saveAndFlush(valueSetToExpand);
});
} finally {
setPreExpandingValueSets(false);
}
}
}
private synchronized void setPreExpandingValueSets(boolean thePreExpandingValueSets) {
myPreExpandingValueSets = thePreExpandingValueSets;
}
private synchronized boolean isPreExpandingValueSets() {
return myPreExpandingValueSets;
}
@Override
@Transactional
public CodeValidationResult validateCode(ConceptValidationOptions theOptions, IIdType theValueSetId, String theValueSetIdentifier, String theCodeSystemIdentifierToValidate, String theCodeToValidate, String theDisplayToValidate, IBaseDatatype theCodingToValidate, IBaseDatatype theCodeableConceptToValidate) {
@ -2642,6 +2669,79 @@ public abstract class BaseTermReadSvcImpl implements ITermReadSvc {
return Optional.of(cs);
}
private static final int SECONDS_IN_MINUTE = 60;
private static final int INDEXED_ROOTS_LOGGING_COUNT = 50_000;
@Transactional
@Override
public ReindexTerminologyResult reindexTerminology() throws InterruptedException {
if (myFulltextSearchSvc == null) {
return ReindexTerminologyResult.SEARCH_SVC_DISABLED;
}
if (isBatchTerminologyTasksRunning()) {
return ReindexTerminologyResult.OTHER_BATCH_TERMINOLOGY_TASKS_RUNNING;
}
// disallow pre-expanding ValueSets while reindexing
myDeferredStorageSvc.setProcessDeferred(false);
int objectLoadingThreadNumber = calculateObjectLoadingThreadNumber();
ourLog.info("Using {} threads to load objects", objectLoadingThreadNumber);
try {
SearchSession searchSession = getSearchSession();
searchSession
.massIndexer( TermConcept.class )
.dropAndCreateSchemaOnStart( true )
.purgeAllOnStart( false )
.batchSizeToLoadObjects( 100 )
.cacheMode( CacheMode.IGNORE )
.threadsToLoadObjects( 6 )
.transactionTimeout( 60 * SECONDS_IN_MINUTE )
.monitor( new LoggingMassIndexingMonitor(INDEXED_ROOTS_LOGGING_COUNT) )
.startAndWait();
} finally {
myDeferredStorageSvc.setProcessDeferred(true);
}
return ReindexTerminologyResult.SUCCESS;
}
@VisibleForTesting
boolean isBatchTerminologyTasksRunning() {
return isNotSafeToPreExpandValueSets() || isPreExpandingValueSets();
}
@VisibleForTesting
int calculateObjectLoadingThreadNumber() {
IConnectionPoolInfoProvider connectionPoolInfoProvider =
new ConnectionPoolInfoProvider(myHibernatePropertiesProvider.getDataSource());
Optional<Integer> maxConnectionsOpt = connectionPoolInfoProvider.getTotalConnectionSize();
if ( ! maxConnectionsOpt.isPresent() ) {
return DEFAULT_MASS_INDEXER_OBJECT_LOADING_THREADS;
}
int maxConnections = maxConnectionsOpt.get();
int usableThreads = maxConnections < 6 ? 1 : maxConnections - 5;
int objectThreads = Math.min(usableThreads, MAX_MASS_INDEXER_OBJECT_LOADING_THREADS);
ourLog.debug("Data source connection pool has {} connections allocated, so reindexing will use {} object " +
"loading threads (each using a connection)", maxConnections, objectThreads);
return objectThreads;
}
@VisibleForTesting
SearchSession getSearchSession() {
return Search.session( myEntityManager );
}
@VisibleForTesting
public static void setForceDisableHibernateSearchForUnitTest(boolean theForceDisableHibernateSearchForUnitTest) {
ourForceDisableHibernateSearchForUnitTest = theForceDisableHibernateSearchForUnitTest;

View File

@ -133,4 +133,10 @@ public interface ITermReadSvc extends IValidationSupport {
*/
Optional<IBaseResource> readCodeSystemByForcedId(String theForcedId);
/**
* Version independent
* Recreates freetext indexes for TermConcept and nested TermConceptProperty
*/
ReindexTerminologyResult reindexTerminology() throws InterruptedException;
}

View File

@ -0,0 +1,11 @@
package ca.uhn.fhir.jpa.term.api;
public enum ReindexTerminologyResult {
SUCCESS,
// search service is not enabled
SEARCH_SVC_DISABLED,
// batch terminology tasks other than re-indexing are currently running
OTHER_BATCH_TERMINOLOGY_TASKS_RUNNING
}

View File

@ -155,7 +155,7 @@ public class TestR4Config {
// .logQueryBySlf4j(level)
.logSlowQueryBySlf4j(10, TimeUnit.SECONDS, level)
.beforeQuery(new BlockLargeNumbersOfParamsListener())
.beforeQuery(new MandatoryTransactionListener())
.beforeQuery( getMandatoryTransactionListener() )
.afterQuery(captureQueriesListener())
.afterQuery(new CurrentThreadCaptureQueriesListener())
.countQuery(singleQueryCountHolder())
@ -170,6 +170,12 @@ public class TestR4Config {
return new SingleQueryCountHolder();
}
@Bean
public ProxyDataSourceBuilder.SingleQueryExecution getMandatoryTransactionListener() {
return new MandatoryTransactionListener();
}
@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory(ConfigurableListableBeanFactory theConfigurableListableBeanFactory, FhirContext theFhirContext) {
LocalContainerEntityManagerFactoryBean retVal = HapiEntityManagerFactoryUtil.newEntityManagerFactory(theConfigurableListableBeanFactory, theFhirContext);

View File

@ -0,0 +1,144 @@
package ca.uhn.fhir.jpa.config.util;
import ca.uhn.fhir.jpa.config.r4.JpaR4Config;
import net.ttddyy.dsproxy.support.ProxyDataSource;
import org.apache.commons.dbcp2.BasicDataSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import javax.sql.DataSource;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(MockitoExtension.class)
class ConnectionPoolInfoProviderTest {
public static final long MAX_WAIT_MILLIS = 10_000;
public static final int MAX_CONNECTIONS_TOTAL = 50;
private IConnectionPoolInfoProvider tested;
@Nested
public class TestBasiDataSourceImplementation {
@BeforeEach
void setUp() {
BasicDataSource myDataSource = new BasicDataSource();
myDataSource.setMaxWaitMillis(MAX_WAIT_MILLIS);
myDataSource.setMaxTotal(MAX_CONNECTIONS_TOTAL);
tested = new BasicDataSourceConnectionPoolInfoProvider(myDataSource);
}
@Test
void testGetMaxWaitMillis() {
Optional<Long> resOpt = tested.getMaxWaitMillis();
assertTrue(resOpt.isPresent());
assertEquals(MAX_WAIT_MILLIS, resOpt.get());
}
@Test
void testGetMaxConnectionSize() {
Optional<Integer> resOpt = tested.getTotalConnectionSize();
assertTrue(resOpt.isPresent());
assertEquals(MAX_CONNECTIONS_TOTAL, resOpt.get());
}
}
@Nested
public class TestFailedProviderSetup {
@Mock DataSource unknownDataSource;
@BeforeEach
void setUp() {
tested = new ConnectionPoolInfoProvider(unknownDataSource);
}
@Test
void testGetMaxWaitMillis() {
Optional<Long> resOpt = tested.getMaxWaitMillis();
assertFalse(resOpt.isPresent());
}
@Test
void testGetMaxConnectionSize() {
Optional<Integer> resOpt = tested.getTotalConnectionSize();
assertFalse(resOpt.isPresent());
}
@Test
void testGetActiveConnections() {
Optional<Integer> resOpt = tested.getActiveConnections();
assertFalse(resOpt.isPresent());
}
}
@Nested
public class TestConfig {
@Mock DataSource unknownDataSource;
@Test
void dataSourceIsBasicDataSource() {
DataSource ds = new BasicDataSource();
IConnectionPoolInfoProvider provider = new ConnectionPoolInfoProvider(ds);
IConnectionPoolInfoProvider instantiatedProvider =
(IConnectionPoolInfoProvider) ReflectionTestUtils.getField(provider, "provider");
assertNotNull(instantiatedProvider);
assertTrue(instantiatedProvider.getClass().isAssignableFrom(BasicDataSourceConnectionPoolInfoProvider.class));
}
@Test
void dataSourceIsProxyDataSourceWrappingBasicDataSource() {
DataSource ds = new BasicDataSource();
ProxyDataSource proxyDs = new ProxyDataSource(ds);
IConnectionPoolInfoProvider provider = new ConnectionPoolInfoProvider(proxyDs);
IConnectionPoolInfoProvider instantiatedProvider =
(IConnectionPoolInfoProvider) ReflectionTestUtils.getField(provider, "provider");
assertNotNull(instantiatedProvider);
assertTrue(instantiatedProvider.getClass().isAssignableFrom(BasicDataSourceConnectionPoolInfoProvider.class));
}
@Test
void dataSourceIsProxyDataSourceWrappingNotBasicDataSource() {
ProxyDataSource proxyDs = new ProxyDataSource(unknownDataSource);
IConnectionPoolInfoProvider provider = new ConnectionPoolInfoProvider(proxyDs);
IConnectionPoolInfoProvider instantiatedProvider =
(IConnectionPoolInfoProvider) ReflectionTestUtils.getField(provider, "provider");
assertNull(instantiatedProvider);
}
@Test
void dataSourceIsNotBasicDataSourceOrProxyDataSource() {
IConnectionPoolInfoProvider provider = new ConnectionPoolInfoProvider(unknownDataSource);
IConnectionPoolInfoProvider instantiatedProvider =
(IConnectionPoolInfoProvider) ReflectionTestUtils.getField(provider, "provider");
assertNull(instantiatedProvider);
}
}
}

View File

@ -0,0 +1,103 @@
package ca.uhn.fhir.jpa.provider;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
import ca.uhn.fhir.jpa.term.api.ReindexTerminologyResult;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.ParametersUtil;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.Optional;
import static ca.uhn.fhir.jpa.provider.BaseJpaSystemProvider.RESP_PARAM_SUCCESS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TerminologyFreetextIndexingProviderTest {
private final FhirContext myContext = FhirContext.forR4();
@Mock private ITermReadSvc myTermReadSvc;
@Mock private SystemRequestDetails myRequestDetails;
@InjectMocks
private BaseJpaSystemProvider<?, ?> testedProvider = new BaseJpaSystemProvider<>();
@BeforeEach
void setUp() {
ReflectionTestUtils.setField(testedProvider, "myContext", myContext);
}
@Test
public void testNoSearchEnabled() throws InterruptedException {
when(myTermReadSvc.reindexTerminology()).thenReturn(ReindexTerminologyResult.SEARCH_SVC_DISABLED);
IBaseParameters retVal = testedProvider.reindexTerminology(myRequestDetails);
assertNotNull(retVal);
Optional<String> successValueOpt = ParametersUtil.getNamedParameterValueAsString(myContext, retVal, RESP_PARAM_SUCCESS);
assertTrue(successValueOpt.isPresent());
assertEquals("false", successValueOpt.get());
Optional<String> msgOpt = ParametersUtil.getNamedParameterValueAsString(myContext, retVal, "message");
assertTrue(msgOpt.isPresent());
assertEquals("Freetext service is not configured. Operation didn't run.", msgOpt.get());
}
@Test
void testOtherTerminologyTasksRunning() throws InterruptedException {
when(myTermReadSvc.reindexTerminology()).thenReturn(ReindexTerminologyResult.OTHER_BATCH_TERMINOLOGY_TASKS_RUNNING);
IBaseParameters retVal = testedProvider.reindexTerminology(myRequestDetails);
assertNotNull(retVal);
Optional<String> successValueOpt = ParametersUtil.getNamedParameterValueAsString(myContext, retVal, RESP_PARAM_SUCCESS);
assertTrue(successValueOpt.isPresent());
assertEquals("false", successValueOpt.get());
Optional<String> msgOpt = ParametersUtil.getNamedParameterValueAsString(myContext, retVal, "message");
assertTrue(msgOpt.isPresent());
assertEquals("Operation was cancelled because other terminology background tasks are currently running. Try again in a few minutes.", msgOpt.get());
}
@Test
void testServiceWorks() throws InterruptedException {
when(myTermReadSvc.reindexTerminology()).thenReturn(ReindexTerminologyResult.SUCCESS);
IBaseParameters retVal = testedProvider.reindexTerminology(myRequestDetails);
assertNotNull(retVal);
Optional<String> successValueOpt = ParametersUtil.getNamedParameterValueAsString(myContext, retVal, RESP_PARAM_SUCCESS);
assertTrue(successValueOpt.isPresent());
assertEquals("true", successValueOpt.get());
}
@Test
void testServiceThroes() throws InterruptedException {
String exceptionMsg = "some msg";
when(myTermReadSvc.reindexTerminology()).thenThrow(new InterruptedException(exceptionMsg));
InternalErrorException thrown = assertThrows(InternalErrorException.class,
() -> testedProvider.reindexTerminology(myRequestDetails));
assertEquals(Msg.code(2072) + "Re-creating terminology freetext indexes " +
"failed with exception: " + exceptionMsg, thrown.getMessage());
}
}

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.term.api;
package ca.uhn.fhir.jpa.term;
/*-
* #%L
@ -22,12 +22,18 @@ package ca.uhn.fhir.jpa.term.api;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.config.HibernatePropertiesProvider;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
import ca.uhn.fhir.jpa.dao.data.ITermValueSetDao;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.term.TermReadSvcR4;
import ca.uhn.fhir.jpa.term.TermReadSvcUtil;
import ca.uhn.fhir.jpa.term.api.ITermDeferredStorageSvc;
import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
import com.google.common.collect.Lists;
import net.ttddyy.dsproxy.support.ProxyDataSource;
import org.apache.commons.dbcp2.BasicDataSource;
import org.hibernate.search.mapper.orm.massindexing.MassIndexer;
import org.hibernate.search.mapper.orm.session.SearchSession;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.CodeSystem;
import org.junit.jupiter.api.BeforeEach;
@ -35,26 +41,35 @@ import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.Pageable;
import org.springframework.test.util.ReflectionTestUtils;
import javax.persistence.EntityManager;
import javax.persistence.NonUniqueResultException;
import javax.sql.DataSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static ca.uhn.fhir.jpa.term.BaseTermReadSvcImpl.DEFAULT_MASS_INDEXER_OBJECT_LOADING_THREADS;
import static ca.uhn.fhir.jpa.term.BaseTermReadSvcImpl.MAX_MASS_INDEXER_OBJECT_LOADING_THREADS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -257,9 +272,10 @@ class ITermReadSvcTest {
List<TermConcept> termConcepts = Lists.newArrayList(termConceptCode1, termConceptCode3, termConceptCode4);
List<String> values = Arrays.asList(CODE_1, CODE_2, CODE_3, CODE_4, CODE_5);
String msg = (String) ReflectionTestUtils.invokeMethod(
String msg = ReflectionTestUtils.invokeMethod(
testedClass, "getTermConceptsFetchExceptionMsg", termConcepts, values);
assertNotNull(msg);
assertTrue(msg.contains("No TermConcept(s) were found"));
assertFalse(msg.contains(CODE_1));
assertTrue(msg.contains(CODE_2));
@ -276,13 +292,101 @@ class ITermReadSvcTest {
when(termConceptCode3.getId()).thenReturn(3L);
List<TermConcept> termConcepts = Lists.newArrayList(termConceptCode1, termConceptCode3);
List<String> values = Arrays.asList(CODE_3);
String msg = (String) ReflectionTestUtils.invokeMethod(
List<String> values = List.of(CODE_3);
String msg = ReflectionTestUtils.invokeMethod(
testedClass, "getTermConceptsFetchExceptionMsg", termConcepts, values);
assertNotNull(msg);
assertTrue(msg.contains("More TermConcepts were found than indicated codes"));
assertFalse(msg.contains("Queried codes: [" + CODE_3 + "]"));
assertTrue(msg.contains("Obtained TermConcept IDs, codes: [1, code-1; 3, code-3]"));
}
}
@Nested
public class TestReindexTerminology {
@Mock private SearchSession mySearchSession;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private MassIndexer myMassIndexer;
@Mock private IFulltextSearchSvc myFulltextSearchSvc;
@Mock private ITermDeferredStorageSvc myDeferredStorageSvc;
@Mock private HibernatePropertiesProvider myHibernatePropertiesProvider;
@InjectMocks
@Spy private BaseTermReadSvcImpl myTermReadSvc = (BaseTermReadSvcImpl) spy(testedClass);
@Test
void testReindexTerminology() throws InterruptedException {
doReturn(mySearchSession).when(myTermReadSvc).getSearchSession();
doReturn(false).when(myTermReadSvc).isBatchTerminologyTasksRunning();
doReturn(10).when(myTermReadSvc).calculateObjectLoadingThreadNumber();
when(mySearchSession.massIndexer(TermConcept.class)).thenReturn(myMassIndexer);
myTermReadSvc.reindexTerminology();
verify(mySearchSession, times(1)).massIndexer(TermConcept.class);
}
@Nested
public class TestCalculateObjectLoadingThreadNumber {
private final BasicDataSource myBasicDataSource = new BasicDataSource();
private final ProxyDataSource myProxyDataSource = new ProxyDataSource(myBasicDataSource) ;
@BeforeEach
void setUp() {
doReturn(myProxyDataSource).when(myHibernatePropertiesProvider).getDataSource();
}
@Test
void testLessThanSix() {
myBasicDataSource.setMaxTotal(5);
int retMaxConnectionSize = myTermReadSvc.calculateObjectLoadingThreadNumber();
assertEquals(1, retMaxConnectionSize);
}
@Test
void testMoreThanSixButLessThanLimit() {
myBasicDataSource.setMaxTotal(10);
int retMaxConnectionSize = myTermReadSvc.calculateObjectLoadingThreadNumber();
assertEquals(5, retMaxConnectionSize);
}
@Test
void testMoreThanSixAndMoreThanLimit() {
myBasicDataSource.setMaxTotal(35);
int retMaxConnectionSize = myTermReadSvc.calculateObjectLoadingThreadNumber();
assertEquals(MAX_MASS_INDEXER_OBJECT_LOADING_THREADS, retMaxConnectionSize);
}
}
@Nested
public class TestCalculateObjectLoadingThreadNumberDefault {
@Mock private DataSource myDataSource = new BasicDataSource();
@BeforeEach
void setUp() {
doReturn(myDataSource).when(myHibernatePropertiesProvider).getDataSource();
}
@Test
void testDefaultWhenCantGetMaxConnections() {
int retMaxConnectionSize = myTermReadSvc.calculateObjectLoadingThreadNumber();
assertEquals(DEFAULT_MASS_INDEXER_OBJECT_LOADING_THREADS, retMaxConnectionSize);
}
}
}
}

View File

@ -0,0 +1,342 @@
package ca.uhn.fhir.jpa.term.freetext;
import ca.uhn.fhir.jpa.config.TestHibernateSearchAddInConfig;
import ca.uhn.fhir.jpa.config.TestR4Config;
import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.jpa.entity.TermCodeSystem;
import ca.uhn.fhir.jpa.entity.TermCodeSystemVersion;
import ca.uhn.fhir.jpa.entity.TermConcept;
import ca.uhn.fhir.jpa.entity.TermValueSet;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;
import ca.uhn.fhir.jpa.term.TermLoaderSvcImpl;
import ca.uhn.fhir.jpa.term.api.ITermLoaderSvc;
import ca.uhn.fhir.jpa.term.api.ITermReadSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import net.ttddyy.dsproxy.ExecutionInfo;
import net.ttddyy.dsproxy.QueryInfo;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.hibernate.search.engine.search.predicate.dsl.PredicateFinalStep;
import org.hibernate.search.engine.search.predicate.dsl.SearchPredicateFactory;
import org.hibernate.search.engine.search.query.SearchQuery;
import org.hibernate.search.mapper.orm.Search;
import org.hibernate.search.mapper.orm.common.EntityReference;
import org.hibernate.search.mapper.orm.mapping.SearchMapping;
import org.hibernate.search.mapper.orm.session.SearchSession;
import org.hibernate.search.mapper.orm.work.SearchWorkspace;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.util.ResourceUtils;
import javax.persistence.EntityManager;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.TERM_CODE_SYSTEM_VERSION_DELETE_JOB_NAME;
import static java.util.Map.entry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {TestR4Config.class, TestHibernateSearchAddInConfig.LuceneFilesystem.class
,ReindexTerminologyFreetextR4Test.NoopMandatoryTransactionListener.class
})
public class ReindexTerminologyFreetextR4Test extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexTerminologyFreetextR4Test.class);
public static final String LOINC_URL = "http://loinc.org";
public static final String TEST_FILES_CLASSPATH = "loinc-reindex/";
public static final String NULL = "'null'";
// set to false to avoid cleanup for debugging purposes
public static final boolean CLEANUP_DATA = true;
public static final String CS_VERSION = "2.68";
public static final int CS_CONCEPTS_NUMBER = 81;
public static final String LOINC_PROPERTIES_CLASSPATH =
ResourceUtils.CLASSPATH_URL_PREFIX + TEST_FILES_CLASSPATH + "Loinc_small_v68.zip";
public static final String LOINC_ZIP_CLASSPATH =
ResourceUtils.CLASSPATH_URL_PREFIX + TEST_FILES_CLASSPATH + "v268_loincupload.properties";
@Autowired private EntityManager myEntityManager;
@Autowired private TermLoaderSvcImpl myTermLoaderSvc;
@Autowired private ITermReadSvc myITermReadSvc;
@Autowired private ITermConceptDao myTermConceptDao;
@Autowired private ITermReadSvc myTermReadSvc;
long termCodeSystemVersionWithVersionId;
long termCodeSystemVersionWithNoVersionId;
Map<String, Long> conceptCounts = Map.ofEntries(
entry("http://loinc.org/vs", 81L),
entry("http://loinc.org/vs/LG100-4", 0L),
entry("http://loinc.org/vs/LG1695-8", 0L),
entry("http://loinc.org/vs/LL1000-0", 3L),
entry("http://loinc.org/vs/LL1001-8", 7L),
entry("http://loinc.org/vs/LL1892-0", 0L),
entry("http://loinc.org/vs/loinc-document-ontology", 1L),
entry("http://loinc.org/vs/loinc-imaging-document-codes", 1L),
entry("http://loinc.org/vs/loinc-rsna-radiology-playbook", 1L),
entry("http://loinc.org/vs/loinc-universal-order-set", 0L),
entry("http://loinc.org/vs/top-2000-lab-observations-si", 0L),
entry("http://loinc.org/vs/top-2000-lab-observations-us", 0L)
);
Map<String, Long> conceptDesignationCounts = Map.ofEntries(
entry("http://loinc.org/vs", 55L),
entry("http://loinc.org/vs/LG100-4", 0L),
entry("http://loinc.org/vs/LG1695-8", 0L),
entry("http://loinc.org/vs/LL1000-0", 0L),
entry("http://loinc.org/vs/LL1001-8", 0L),
entry("http://loinc.org/vs/LL1892-0", 0L),
entry("http://loinc.org/vs/loinc-document-ontology", 0L),
entry("http://loinc.org/vs/loinc-imaging-document-codes", 1L),
entry("http://loinc.org/vs/loinc-rsna-radiology-playbook", 1L),
entry("http://loinc.org/vs/loinc-universal-order-set", 0L),
entry("http://loinc.org/vs/top-2000-lab-observations-si", 0L),
entry("http://loinc.org/vs/top-2000-lab-observations-us", 0L)
);
@Test()
public void uploadLoincCodeSystem() throws FileNotFoundException, InterruptedException {
List<ITermLoaderSvc.FileDescriptor> myFileDescriptors = buildFileDescriptors();
// upload terminology
myTermLoaderSvc.loadLoinc(myFileDescriptors, mySrd);
// save all deferred concepts, properties, links, etc
myTerminologyDeferredStorageSvc.saveAllDeferred();
validateSavedConceptsCount();
// check the number of freetext-indexed TermConcepts
validateFreetextCounts();
// pre-expand ValueSets
myTermReadSvc.preExpandDeferredValueSetsToTerminologyTables();
// pre-expansion uses freetext so check is to make sure all valuesets have the right number of concepts
validateValueSetPreexpansion();
// run reindexing operation
reFreetextIndexTerminology();
// validate again after reindexing the number of freetext-indexed TermConcepts
validateFreetextCounts();
// remove ValueSet pre-expansions
removeValueSetPreExpansions();
// pre-expand ValueSets again, after freetext reindexing
myTermReadSvc.preExpandDeferredValueSetsToTerminologyTables();
// pre-expansion uses freetext so check is to make sure all valuesets have the right number of concepts
validateValueSetPreexpansion();
}
private void removeValueSetPreExpansions() {
List<TermValueSet> termValueSets = myTermValueSetDao.findAll();
for (TermValueSet termValueSet : termValueSets) {
myTermReadSvc.invalidatePreCalculatedExpansion(termValueSet.getResource().getIdDt(), new SystemRequestDetails());
}
}
/**
* check number of TermConcepts (DB vs freetext-indexed)
*/
private void validateFreetextCounts() {
int dbTermConceptCountForVersion = runInTransaction(() ->
myTermConceptDao.countByCodeSystemVersion(termCodeSystemVersionWithVersionId) );
assertEquals(CS_CONCEPTS_NUMBER, dbTermConceptCountForVersion);
long termConceptCountForVersion = searchAllIndexedTermConceptCount(termCodeSystemVersionWithVersionId);
ourLog.info("=================> Number of freetext found concepts after re-indexing for version {}: {}",
CS_VERSION, termConceptCountForVersion);
assertEquals(CS_CONCEPTS_NUMBER, termConceptCountForVersion);
int dbTermConceptCountForNullVersion = runInTransaction(() ->
myTermConceptDao.countByCodeSystemVersion(termCodeSystemVersionWithNoVersionId) );
assertEquals(CS_CONCEPTS_NUMBER, dbTermConceptCountForNullVersion);
long termConceptCountNullVersion = searchAllIndexedTermConceptCount(termCodeSystemVersionWithVersionId);
ourLog.info("=================> Number of freetext found concepts after re-indexing for version {}: {}",
NULL, termConceptCountNullVersion);
assertEquals(CS_CONCEPTS_NUMBER, termConceptCountNullVersion);
}
private void validateFreetextIndexesEmpty() {
long termConceptCountVersioned = searchAllIndexedTermConceptCount(termCodeSystemVersionWithVersionId);
assertEquals(0, termConceptCountVersioned);
long termConceptCountNotVersioned = searchAllIndexedTermConceptCount(termCodeSystemVersionWithNoVersionId);
assertEquals(0, termConceptCountNotVersioned);
}
/**
* Checks the number of VS Concepts and ConceptDesignations against test pre-specified values
*/
private void validateValueSetPreexpansion() {
List<TermValueSet> termValueSets = myTermValueSetDao.findAll();
for (TermValueSet termValueSet : termValueSets) {
ourLog.debug("=================> testing ValueSet: {}", termValueSet.getUrl());
long conceptCount = conceptCounts.get( termValueSet.getUrl() );
assertEquals(conceptCount, termValueSet.getTotalConcepts());
long conceptDesignationCount = conceptDesignationCounts.get( termValueSet.getUrl() );
assertEquals(conceptDesignationCount, termValueSet.getTotalConceptDesignations());
}
}
private void validateSavedConceptsCount() {
termCodeSystemVersionWithVersionId = getTermCodeSystemVersionNotNullId();
int dbVersionedTermConceptCount = runInTransaction(() ->
myTermConceptDao.countByCodeSystemVersion(termCodeSystemVersionWithVersionId) );
ourLog.info("=================> Number of stored concepts for version {}: {}", CS_VERSION, dbVersionedTermConceptCount);
assertEquals(CS_CONCEPTS_NUMBER, dbVersionedTermConceptCount);
termCodeSystemVersionWithNoVersionId = getTermCodeSystemVersionNullId();
int dbNotVersionedTermConceptCount = runInTransaction(() ->
myTermConceptDao.countByCodeSystemVersion(termCodeSystemVersionWithNoVersionId) );
ourLog.info("=================> Number of stored concepts for version {}: {}", NULL, dbNotVersionedTermConceptCount);
assertEquals(CS_CONCEPTS_NUMBER, dbNotVersionedTermConceptCount);
}
private void reFreetextIndexTerminology() throws InterruptedException {
myTermReadSvc.reindexTerminology();
}
private long getTermCodeSystemVersionNotNullId() {
return runInTransaction(() -> {
TermCodeSystem myTermCodeSystem = myTermCodeSystemDao.findByCodeSystemUri(LOINC_URL);
TermCodeSystemVersion termCodeSystemVersion = myTermCodeSystemVersionDao
.findByCodeSystemPidAndVersion(myTermCodeSystem.getPid(), CS_VERSION);
assertNotNull(termCodeSystemVersion);
return termCodeSystemVersion.getPid();
});
}
private long getTermCodeSystemVersionNullId() {
return runInTransaction(() -> {
TermCodeSystem myTermCodeSystem = myTermCodeSystemDao.findByCodeSystemUri(LOINC_URL);
TermCodeSystemVersion termCodeSystemVersion = myTermCodeSystemVersionDao
.findByCodeSystemPidVersionIsNull(myTermCodeSystem.getPid());
assertNotNull(termCodeSystemVersion);
return termCodeSystemVersion.getPid();
});
}
private List<ITermLoaderSvc.FileDescriptor> buildFileDescriptors() throws FileNotFoundException {
List<ITermLoaderSvc.FileDescriptor> fileDescriptors = new ArrayList<>();
File propsFile = ResourceUtils.getFile(LOINC_PROPERTIES_CLASSPATH);
fileDescriptors.add( new TerminologyUploaderProvider.FileBackedFileDescriptor(propsFile) );
File zipFile = ResourceUtils.getFile(LOINC_ZIP_CLASSPATH);
fileDescriptors.add( new TerminologyUploaderProvider.FileBackedFileDescriptor(zipFile) );
return fileDescriptors;
}
private long searchAllIndexedTermConceptCount(long theCodeSystemVersionId) {
return runInTransaction(() -> {
SearchSession searchSession = Search.session(myEntityManager);
SearchPredicateFactory predicate = searchSession.scope(TermConcept.class).predicate();
PredicateFinalStep step = predicate.bool(b ->
b.must(predicate.match().field("myCodeSystemVersionPid").matching(theCodeSystemVersionId)) );
SearchQuery<EntityReference> termConceptsQuery = searchSession
.search(TermConcept.class)
.selectEntityReference()
.where(f -> step)
.toQuery();
ourLog.trace("About to query: {}", termConceptsQuery.queryString());
return termConceptsQuery.fetchTotalHitCount();
});
}
/**
* This configuration bypasses the MandatoryTransactionListener, which breaks this test
* (I think it is because hibernate search massIndexer starts threads which don't participate of test transactions)
*/
@Configuration
public static class NoopMandatoryTransactionListener {
@Bean
public ProxyDataSourceBuilder.SingleQueryExecution getMandatoryTransactionListener() {
return new ProxyDataSourceBuilder.SingleQueryExecution() {
@Override
public void execute(ExecutionInfo execInfo, List<QueryInfo> queryInfoList) {
}
};
}
}
@Override
public void afterCleanupDao() {
if (CLEANUP_DATA) {
super.afterCleanupDao();
}
}
@Override
public void afterResetInterceptors() {
if (CLEANUP_DATA) {
super.afterResetInterceptors();
}
}
@Override
public void afterClearTerminologyCaches() {
if (CLEANUP_DATA) {
super.afterClearTerminologyCaches();
}
}
@Override
public void afterPurgeDatabase() {
if (CLEANUP_DATA) {
super.afterPurgeDatabase();
}
}
@Override
public void afterEachClearCaches() {
if (CLEANUP_DATA) {
super.afterEachClearCaches();
}
}
}

View File

@ -0,0 +1,87 @@
#################
### MANDATORY ###
#################
# Answer lists (ValueSets of potential answers/values for LOINC "questions")
## File must be present
loinc.answerlist.file=AccessoryFiles/AnswerFile/AnswerList.csv
# Answer list links (connects LOINC observation codes to answer list codes)
## File must be present
loinc.answerlist.link.file=AccessoryFiles/AnswerFile/LoincAnswerListLink.csv
# Document ontology
## File must be present
loinc.document.ontology.file=AccessoryFiles/DocumentOntology/DocumentOntology.csv
# LOINC codes
## File must be present
loinc.file=LoincTable/Loinc.csv
# LOINC hierarchy
## File must be present
loinc.hierarchy.file=AccessoryFiles/MultiAxialHierarchy/MultiAxialHierarchy.csv
# IEEE medical device codes
## File must be present
loinc.ieee.medical.device.code.mapping.table.file=AccessoryFiles/LoincIeeeMedicalDeviceCodeMappingTable/LoincIeeeMedicalDeviceCodeMappingTable.csv
# Imaging document codes
## File must be present
loinc.imaging.document.codes.file=AccessoryFiles/ImagingDocuments/ImagingDocumentCodes.csv
# Part
## File must be present
loinc.part.file=AccessoryFiles/PartFile/Part.csv
# Part link
## File must be present
loinc.part.link.primary.file=AccessoryFiles/PartFile/LoincPartLink_Primary.csv
loinc.part.link.supplementary.file=AccessoryFiles/PartFile/LoincPartLink_Supplementary.csv
# Part related code mapping
## File must be present
loinc.part.related.code.mapping.file=AccessoryFiles/PartFile/PartRelatedCodeMapping.csv
# RSNA playbook
## File must be present
loinc.rsna.playbook.file=AccessoryFiles/LoincRsnaRadiologyPlaybook/LoincRsnaRadiologyPlaybook.csv
# Top 2000 codes - SI
## File must be present
loinc.top2000.common.lab.results.si.file=AccessoryFiles/Top2000Results/SI/Top2000CommonLabResultsSi.csv
# Top 2000 codes - US
## File must be present
loinc.top2000.common.lab.results.us.file=AccessoryFiles/Top2000Results/US/Top2000CommonLabResultsUs.csv
# Universal lab order ValueSet
## File must be present
loinc.universal.lab.order.valueset.file=AccessoryFiles/LoincUniversalLabOrdersValueSet/LoincUniversalLabOrdersValueSet.csv
################
### OPTIONAL ###
################
# This is the version identifier for the LOINC code system
## Key may be omitted if only a single version of LOINC is being kept.
loinc.codesystem.version=2.68
# This is the version identifier for the answer list file
## Key may be omitted
loinc.answerlist.version=Beta.1
# This is the version identifier for uploaded ConceptMap resources
## Key may be omitted
loinc.conceptmap.version=Beta.1
# Group
## Default value if key not provided: AccessoryFiles/GroupFile/Group.csv
## File may be omitted
loinc.group.file=AccessoryFiles/GroupFile/Group.csv
# Group terms
## Default value if key not provided: AccessoryFiles/GroupFile/GroupLoincTerms.csv
## File may be omitted
loinc.group.terms.file=AccessoryFiles/GroupFile/GroupLoincTerms.csv
# Parent group
## Default value if key not provided: AccessoryFiles/GroupFile/ParentGroup.csv
## File may be omitted
loinc.parent.group.file=AccessoryFiles/GroupFile/ParentGroup.csv

View File

@ -191,6 +191,11 @@ public class ProviderConstants {
*/
public static final String OPERATION_MEMBER_MATCH = "$member-match";
/**
* Operation name for the $reindex-terminology operation
*/
public static final String OPERATION_REINDEX_TERMINOLOGY = "$reindex-terminology";
@Deprecated
public static final String MARK_ALL_RESOURCES_FOR_REINDEXING = "$mark-all-resources-for-reindexing";
/**