Add reindexing support based on table instead of column

This commit is contained in:
James Agnew 2018-11-04 20:00:27 +01:00 committed by Eeva Turkka
parent ba8795ed0f
commit 11bc2c19a9
64 changed files with 1873 additions and 868 deletions

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.cli;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -25,15 +25,24 @@ import ca.uhn.fhir.jpa.migrate.Migrator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.defaultString;
public abstract class BaseMigrateDatabaseCommand<T extends Enum> extends BaseCommand {
private static final String MIGRATE_DATABASE = "migrate-database";
private Set<String> myFlags;
protected Set<String> getFlags() {
return myFlags;
}
@Override
public String getCommandDescription() {
@ -68,6 +77,7 @@ public abstract class BaseMigrateDatabaseCommand<T extends Enum> extends BaseCom
addRequiredOption(retVal, "f", "from", "Version", "The database schema version to migrate FROM");
addRequiredOption(retVal, "t", "to", "Version", "The database schema version to migrate TO");
addRequiredOption(retVal, "d", "driver", "Driver", "The database driver to use (Options are " + driverOptions() + ")");
addOptionalOption(retVal, "x", "flags", "Flags", "A comma-separated list of any specific migration flags (these flags are version specific, see migrator documentation for details)");
return retVal;
}
@ -97,6 +107,12 @@ public abstract class BaseMigrateDatabaseCommand<T extends Enum> extends BaseCom
boolean dryRun = theCommandLine.hasOption("r");
String flags = theCommandLine.getOptionValue("x");
myFlags = Arrays.stream(defaultString(flags).split(","))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toSet());
Migrator migrator = new Migrator();
migrator.setConnectionUrl(url);
migrator.setDriverType(driverType);

View File

@ -42,7 +42,7 @@ public class HapiMigrateDatabaseCommand extends BaseMigrateDatabaseCommand<Versi
@Override
protected void addTasks(Migrator theMigrator, VersionEnum theFrom, VersionEnum theTo) {
List<BaseTask<?>> tasks = new HapiFhirJpaMigrationTasks().getTasks(theFrom, theTo);
List<BaseTask<?>> tasks = new HapiFhirJpaMigrationTasks(getFlags()).getTasks(theFrom, theTo);
tasks.forEach(theMigrator::addTask);
}
}

View File

@ -7,14 +7,26 @@ import org.apache.commons.io.IOUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
import org.springframework.jdbc.support.lob.DefaultLobHandler;
import org.springframework.jdbc.support.lob.LobCreator;
import java.io.File;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class HapiMigrateDatabaseCommandTest {
@ -25,39 +37,20 @@ public class HapiMigrateDatabaseCommandTest {
}
@Test
public void testMigrate() throws IOException {
public void testMigrate_340_350() throws IOException {
File directory = new File("target/migrator_derby_test_340_350");
if (directory.exists()) {
FileUtils.deleteDirectory(directory);
}
String url = "jdbc:derby:directory:target/migrator_derby_test_340_350;create=true";
String url = "jdbc:derby:directory:" + directory.getAbsolutePath() + ";create=true";
DriverTypeEnum.ConnectionProperties connectionProperties = DriverTypeEnum.DERBY_EMBEDDED.newConnectionProperties(url, "", "");
String script = IOUtils.toString(HapiMigrateDatabaseCommandTest.class.getResourceAsStream("/persistence_create_derby107_340.sql"), Charsets.UTF_8);
List<String> scriptStatements = new ArrayList<>(Arrays.asList(script.split("\n")));
for (int i = 0; i < scriptStatements.size(); i++) {
String nextStatement = scriptStatements.get(i);
if (isBlank(nextStatement)) {
scriptStatements.remove(i);
i--;
continue;
}
String initSql = "/persistence_create_derby107_340.sql";
executeSqlStatements(connectionProperties, initSql);
nextStatement = nextStatement.trim();
while (nextStatement.endsWith(";")) {
nextStatement = nextStatement.substring(0, nextStatement.length() - 1);
}
scriptStatements.set(i, nextStatement);
}
connectionProperties.getTxTemplate().execute(t -> {
for (String next : scriptStatements) {
connectionProperties.newJdbcTemplate().execute(next);
}
return null;
});
seedDatabase340(connectionProperties);
ourLog.info("**********************************************");
ourLog.info("Done Setup, Starting Dry Run...");
@ -75,6 +68,13 @@ public class HapiMigrateDatabaseCommandTest {
};
App.main(args);
connectionProperties.getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = connectionProperties.newJdbcTemplate();
List<Map<String, Object>> values = jdbcTemplate.queryForList("SELECT * FROM hfj_spidx_token");
assertFalse(values.get(0).keySet().contains("HASH_IDENTITY"));
return null;
});
ourLog.info("**********************************************");
ourLog.info("Done Setup, Starting Migration...");
ourLog.info("**********************************************");
@ -89,5 +89,245 @@ public class HapiMigrateDatabaseCommandTest {
"-t", "V3_5_0"
};
App.main(args);
connectionProperties.getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = connectionProperties.newJdbcTemplate();
List<Map<String, Object>> values = jdbcTemplate.queryForList("SELECT * FROM hfj_spidx_token");
assertEquals(1, values.size());
assertEquals("identifier", values.get(0).get("SP_NAME"));
assertEquals("12345678", values.get(0).get("SP_VALUE"));
assertTrue(values.get(0).keySet().contains("HASH_IDENTITY"));
assertEquals(7001889285610424179L, values.get(0).get("HASH_IDENTITY"));
return null;
});
}
@Test
public void testMigrate_340_360() throws IOException {
File directory = new File("target/migrator_derby_test_340_360");
if (directory.exists()) {
FileUtils.deleteDirectory(directory);
}
String url = "jdbc:derby:directory:" + directory.getAbsolutePath() + ";create=true";
DriverTypeEnum.ConnectionProperties connectionProperties = DriverTypeEnum.DERBY_EMBEDDED.newConnectionProperties(url, "", "");
String initSql = "/persistence_create_derby107_340.sql";
executeSqlStatements(connectionProperties, initSql);
seedDatabase340(connectionProperties);
ourLog.info("**********************************************");
ourLog.info("Done Setup, Starting Migration...");
ourLog.info("**********************************************");
String[] args = new String[]{
"migrate-database",
"-d", "DERBY_EMBEDDED",
"-u", url,
"-n", "",
"-p", "",
"-f", "V3_4_0",
"-t", "V3_6_0"
};
App.main(args);
connectionProperties.getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = connectionProperties.newJdbcTemplate();
List<Map<String, Object>> values = jdbcTemplate.queryForList("SELECT * FROM hfj_spidx_token");
assertEquals(1, values.size());
assertEquals("identifier", values.get(0).get("SP_NAME"));
assertEquals("12345678", values.get(0).get("SP_VALUE"));
assertTrue(values.get(0).keySet().contains("HASH_IDENTITY"));
assertEquals(7001889285610424179L, values.get(0).get("HASH_IDENTITY"));
return null;
});
}
private void seedDatabase340(DriverTypeEnum.ConnectionProperties theConnectionProperties) {
theConnectionProperties.getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = theConnectionProperties.newJdbcTemplate();
jdbcTemplate.execute(
"insert into HFJ_RESOURCE (RES_DELETED_AT, RES_VERSION, FORCED_ID_PID, HAS_TAGS, RES_PUBLISHED, RES_UPDATED, SP_HAS_LINKS, HASH_SHA256, SP_INDEX_STATUS, RES_LANGUAGE, SP_CMPSTR_UNIQ_PRESENT, SP_COORDS_PRESENT, SP_DATE_PRESENT, SP_NUMBER_PRESENT, SP_QUANTITY_PRESENT, SP_STRING_PRESENT, SP_TOKEN_PRESENT, SP_URI_PRESENT, RES_PROFILE, RES_TYPE, RES_VER, RES_ID) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setNull(1, Types.TIMESTAMP);
thePs.setString(2, "R4");
thePs.setNull(3, Types.BIGINT);
thePs.setBoolean(4, false);
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setTimestamp(6, new Timestamp(System.currentTimeMillis()));
thePs.setBoolean(7, false);
thePs.setNull(8, Types.VARCHAR);
thePs.setLong(9, 1L);
thePs.setNull(10, Types.VARCHAR);
thePs.setBoolean(11, false);
thePs.setBoolean(12, false);
thePs.setBoolean(13, false);
thePs.setBoolean(14, false);
thePs.setBoolean(15, false);
thePs.setBoolean(16, false);
thePs.setBoolean(17, false);
thePs.setBoolean(18, false);
thePs.setNull(19, Types.VARCHAR);
thePs.setString(20, "Patient");
thePs.setLong(21, 1L);
thePs.setLong(22, 1L);
}
}
);
jdbcTemplate.execute(
"insert into HFJ_RES_VER (RES_DELETED_AT, RES_VERSION, FORCED_ID_PID, HAS_TAGS, RES_PUBLISHED, RES_UPDATED, RES_ENCODING, RES_TEXT, RES_ID, RES_TYPE, RES_VER, PID) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setNull(1, Types.TIMESTAMP);
thePs.setString(2, "R4");
thePs.setNull(3, Types.BIGINT);
thePs.setBoolean(4, false);
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setTimestamp(6, new Timestamp(System.currentTimeMillis()));
thePs.setString(7, "JSON");
theLobCreator.setBlobAsBytes(thePs, 8, "{\"resourceType\":\"Patient\"}".getBytes(Charsets.US_ASCII));
thePs.setLong(9, 1L);
thePs.setString(10, "Patient");
thePs.setLong(11, 1L);
thePs.setLong(12, 1L);
}
}
);
jdbcTemplate.execute(
"insert into HFJ_SPIDX_STRING (SP_MISSING, SP_NAME, RES_ID, RES_TYPE, SP_UPDATED, SP_VALUE_EXACT, SP_VALUE_NORMALIZED, SP_ID) values (?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setBoolean(1, false);
thePs.setString(2, "given");
thePs.setLong(3, 1L); // res-id
thePs.setString(4, "Patient");
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setString(6, "ROBERT");
thePs.setString(7, "Robert");
thePs.setLong(8, 1L);
}
}
);
jdbcTemplate.execute(
"insert into HFJ_SPIDX_TOKEN (SP_MISSING, SP_NAME, RES_ID, RES_TYPE, SP_UPDATED, SP_SYSTEM, SP_VALUE, SP_ID) values (?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setBoolean(1, false);
thePs.setString(2, "identifier");
thePs.setLong(3, 1L); // res-id
thePs.setString(4, "Patient");
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setString(6, "http://foo");
thePs.setString(7, "12345678");
thePs.setLong(8, 1L);
}
}
);
jdbcTemplate.execute(
"insert into HFJ_SPIDX_DATE (SP_MISSING, SP_NAME, RES_ID, RES_TYPE, SP_UPDATED, SP_VALUE_HIGH, SP_VALUE_LOW, SP_ID) values (?, ?, ?, ?, ?, ?, ?, ?)",
new AbstractLobCreatingPreparedStatementCallback(new DefaultLobHandler()) {
@Override
protected void setValues(PreparedStatement thePs, LobCreator theLobCreator) throws SQLException {
thePs.setBoolean(1, false);
thePs.setString(2, "birthdate");
thePs.setLong(3, 1L); // res-id
thePs.setString(4, "Patient");
thePs.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
thePs.setTimestamp(6, new Timestamp(1000000000L)); // value high
thePs.setTimestamp(7, new Timestamp(1000000000L)); // value low
thePs.setLong(8, 1L);
}
}
);
return null;
});
}
@Test
public void testMigrate_340_350_NoMigrateHashes() throws IOException {
File directory = new File("target/migrator_derby_test_340_350_nmh");
if (directory.exists()) {
FileUtils.deleteDirectory(directory);
}
String url = "jdbc:derby:directory:" + directory.getAbsolutePath() + ";create=true";
DriverTypeEnum.ConnectionProperties connectionProperties = DriverTypeEnum.DERBY_EMBEDDED.newConnectionProperties(url, "", "");
String initSql = "/persistence_create_derby107_340.sql";
executeSqlStatements(connectionProperties, initSql);
seedDatabase340(connectionProperties);
ourLog.info("**********************************************");
ourLog.info("Done Setup, Starting Migration...");
ourLog.info("**********************************************");
String[] args = new String[]{
"migrate-database",
"-d", "DERBY_EMBEDDED",
"-u", url,
"-n", "",
"-p", "",
"-f", "V3_4_0",
"-t", "V3_5_0",
"-x", "no-migrate-350-hashes"
};
App.main(args);
connectionProperties.getTxTemplate().execute(t -> {
JdbcTemplate jdbcTemplate = connectionProperties.newJdbcTemplate();
List<Map<String, Object>> values = jdbcTemplate.queryForList("SELECT * FROM hfj_spidx_token");
assertEquals(1, values.size());
assertEquals("identifier", values.get(0).get("SP_NAME"));
assertEquals("12345678", values.get(0).get("SP_VALUE"));
assertEquals(null, values.get(0).get("HASH_IDENTITY"));
return null;
});
}
private void executeSqlStatements(DriverTypeEnum.ConnectionProperties theConnectionProperties, String theInitSql) throws
IOException {
String script = IOUtils.toString(HapiMigrateDatabaseCommandTest.class.getResourceAsStream(theInitSql), Charsets.UTF_8);
List<String> scriptStatements = new ArrayList<>(Arrays.asList(script.split("\n")));
for (int i = 0; i < scriptStatements.size(); i++) {
String nextStatement = scriptStatements.get(i);
if (isBlank(nextStatement)) {
scriptStatements.remove(i);
i--;
continue;
}
nextStatement = nextStatement.trim();
while (nextStatement.endsWith(";")) {
nextStatement = nextStatement.substring(0, nextStatement.length() - 1);
}
scriptStatements.set(i, nextStatement);
}
theConnectionProperties.getTxTemplate().execute(t -> {
for (String next : scriptStatements) {
theConnectionProperties.newJdbcTemplate().execute(next);
}
return null;
});
}
}

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.config;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -25,6 +25,8 @@ import ca.uhn.fhir.i18n.HapiLocalizer;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
import ca.uhn.fhir.jpa.search.*;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexingSvcImpl;
import ca.uhn.fhir.jpa.search.warm.CacheWarmingSvcImpl;
import ca.uhn.fhir.jpa.search.warm.ICacheWarmingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
@ -32,8 +34,6 @@ import ca.uhn.fhir.jpa.sp.SearchParamPresenceSvcImpl;
import ca.uhn.fhir.jpa.subscription.email.SubscriptionEmailInterceptor;
import ca.uhn.fhir.jpa.subscription.resthook.SubscriptionRestHookInterceptor;
import ca.uhn.fhir.jpa.subscription.websocket.SubscriptionWebsocketInterceptor;
import ca.uhn.fhir.jpa.util.IReindexController;
import ca.uhn.fhir.jpa.util.ReindexController;
import org.hibernate.cfg.AvailableSettings;
import org.hibernate.jpa.HibernatePersistenceProvider;
import org.hibernate.query.criteria.LiteralHandlingMode;
@ -60,6 +60,7 @@ import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
@Configuration
@EnableScheduling
@EnableJpaRepositories(basePackages = "ca.uhn.fhir.jpa.dao.data")
@ -150,11 +151,6 @@ public abstract class BaseConfig implements SchedulingConfigurer {
return new HibernateJpaDialect();
}
@Bean
public IReindexController reindexController() {
return new ReindexController();
}
@Bean()
public ScheduledExecutorService scheduledExecutorService() {
ScheduledExecutorFactoryBean b = new ScheduledExecutorFactoryBean();
@ -163,7 +159,7 @@ public abstract class BaseConfig implements SchedulingConfigurer {
return b.getObject();
}
@Bean(name="mySubscriptionTriggeringProvider")
@Bean(name = "mySubscriptionTriggeringProvider")
@Lazy
public SubscriptionTriggeringProvider subscriptionTriggeringProvider() {
return new SubscriptionTriggeringProvider();
@ -215,6 +211,11 @@ public abstract class BaseConfig implements SchedulingConfigurer {
return retVal;
}
@Bean
public IResourceReindexingSvc resourceReindexingSvc() {
return new ResourceReindexingSvcImpl();
}
public static void configureEntityManagerFactory(LocalContainerEntityManagerFactoryBean theFactory, FhirContext theCtx) {
theFactory.setJpaDialect(hibernateJpaDialect(theCtx.getLocalizer()));
theFactory.setPackagesToScan("ca.uhn.fhir.jpa.entity");

View File

@ -712,20 +712,6 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> implements IDao,
return dao;
}
protected IFhirResourceDao<?> getDaoOrThrowException(Class<? extends IBaseResource> theClass) {
IFhirResourceDao<? extends IBaseResource> retVal = getDao(theClass);
if (retVal == null) {
List<String> supportedResourceTypes = getDaos()
.keySet()
.stream()
.map(t -> myContext.getResourceDefinition(t).getName())
.sorted()
.collect(Collectors.toList());
throw new InvalidRequestException("Unable to process request, this server does not know how to handle resources of type " + getContext().getResourceDefinition(theClass).getName() + " - Can handle: " + supportedResourceTypes);
}
return retVal;
}
private Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> getDaos() {
if (myResourceTypeToDao == null) {
Map<Class<? extends IBaseResource>, IFhirResourceDao<?>> resourceTypeToDao = new HashMap<>();

View File

@ -29,10 +29,10 @@ import ca.uhn.fhir.jpa.dao.data.ISearchResultDao;
import ca.uhn.fhir.jpa.entity.*;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.util.DeleteConflict;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
import ca.uhn.fhir.jpa.util.IReindexController;
import ca.uhn.fhir.jpa.util.jsonpatch.JsonPatchUtils;
import ca.uhn.fhir.jpa.util.xmlpatch.XmlPatchUtils;
import ca.uhn.fhir.model.api.*;
@ -42,7 +42,6 @@ import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.param.ParameterUtil;
import ca.uhn.fhir.rest.param.QualifierDetails;
import ca.uhn.fhir.rest.server.RestfulServerUtils;
import ca.uhn.fhir.rest.server.exceptions.*;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
@ -91,8 +90,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
private String mySecondaryPrimaryKeyParamName;
@Autowired
private ISearchParamRegistry mySearchParamRegistry;
@Autowired
private IReindexController myReindexController;
@Override
public void addTag(IIdType theId, TagTypeEnum theTagType, String theScheme, String theTerm, String theLabel) {
@ -624,22 +621,21 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
TransactionTemplate txTemplate = new TransactionTemplate(myPlatformTransactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
Integer updatedCount = txTemplate.execute(new TransactionCallback<Integer>() {
@Override
public @NonNull
Integer doInTransaction(@Nonnull TransactionStatus theStatus) {
return myResourceTableDao.markResourcesOfTypeAsRequiringReindexing(resourceType);
}
txTemplate.execute(t->{
myResourceReindexingSvc.markAllResourcesForReindexing(resourceType);
return null;
});
ourLog.debug("Marked {} resources for reindexing", updatedCount);
ourLog.debug("Marked resources of type {} for reindexing", resourceType);
}
}
mySearchParamRegistry.requestRefresh();
myReindexController.requestReindex();
}
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
@Override
public <MT extends IBaseMetaType> MT metaAddOperation(IIdType theResourceId, MT theMetaAdd, RequestDetails theRequestDetails) {
// Notify interceptors
@ -727,6 +723,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return retVal;
}
@SuppressWarnings("JpaQlInspection")
@Override
public <MT extends IBaseMetaType> MT metaGetOperation(Class<MT> theType, RequestDetails theRequestDetails) {
// Notify interceptors

View File

@ -3,42 +3,28 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.entity.ForcedId;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
import ca.uhn.fhir.jpa.util.ReindexFailureException;
import ca.uhn.fhir.jpa.util.ResourceCountCache;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hibernate.search.util.impl.Executors;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.PageRequest;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.Query;
import java.util.*;
import java.util.concurrent.*;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.commons.lang3.StringUtils.isBlank;
/*
* #%L
* HAPI FHIR JPA Server
@ -48,9 +34,9 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -76,71 +62,7 @@ public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBase
private PlatformTransactionManager myTxManager;
@Autowired
private IResourceTableDao myResourceTableDao;
private ThreadFactory myReindexingThreadFactory = new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
private int doPerformReindexingPass(final Integer theCount) {
/*
* If any search parameters have been recently added or changed,
* this makes sure that the cache has been reloaded to reflect
* them.
*/
mySearchParamRegistry.refreshCacheIfNecessary();
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRED);
return doPerformReindexingPassForResources(theCount, txTemplate);
}
@SuppressWarnings("ConstantConditions")
private int doPerformReindexingPassForResources(final Integer theCount, TransactionTemplate txTemplate) {
// Determine the IDs needing reindexing
List<Long> idsToReindex = txTemplate.execute(theStatus -> {
int maxResult = 500;
if (theCount != null) {
maxResult = Math.min(theCount, 2000);
}
maxResult = Math.max(maxResult, 10);
ourLog.debug("Beginning indexing query with maximum {}", maxResult);
return myResourceTableDao
.findIdsOfResourcesRequiringReindexing(new PageRequest(0, maxResult))
.getContent();
});
// If no IDs need reindexing, we're good here
if (idsToReindex.isEmpty()) {
return 0;
}
// Reindex
StopWatch sw = new StopWatch();
// Execute each reindex in a task within a threadpool
int threadCount = getConfig().getReindexThreadCount();
RejectedExecutionHandler rejectHandler = new Executors.BlockPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(threadCount, threadCount,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
myReindexingThreadFactory,
rejectHandler
);
List<Future<?>> futures = new ArrayList<>();
for (Long nextId : idsToReindex) {
futures.add(executor.submit(new ResourceReindexingTask(nextId)));
}
for (Future<?> next : futures) {
try {
next.get();
} catch (Exception e) {
throw new InternalErrorException("Failed to reindex: ", e);
}
}
executor.shutdown();
ourLog.info("Reindexed {} resources in {} threads - {}ms/resource", idsToReindex.size(), threadCount, sw.getMillisPerOperation(idsToReindex.size()));
return idsToReindex.size();
}
@Override
@Transactional(propagation = Propagation.REQUIRED)
@ -182,165 +104,5 @@ public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBase
return retVal;
}
@Transactional()
@Override
public int markAllResourcesForReindexing() {
ourLog.info("Marking all resources as needing reindexing");
int retVal = myEntityManager.createQuery("UPDATE " + ResourceTable.class.getSimpleName() + " t SET t.myIndexStatus = null").executeUpdate();
ourLog.info("Marking all concepts as needing reindexing");
retVal += myTermConceptDao.markAllForReindexing();
ourLog.info("Done marking reindexing");
return retVal;
}
private void markResourceAsIndexingFailed(final long theId) {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionTemplate.PROPAGATION_REQUIRES_NEW);
txTemplate.execute(new TransactionCallback<Void>() {
@Override
public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
ourLog.info("Marking resource with PID {} as indexing_failed", new Object[] {theId});
Query q = myEntityManager.createQuery("UPDATE ResourceTable t SET t.myIndexStatus = :status WHERE t.myId = :id");
q.setParameter("status", INDEX_STATUS_INDEXING_FAILED);
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
return null;
}
});
}
@Override
@Transactional(propagation = Propagation.NEVER)
public Integer performReindexingPass(final Integer theCount) {
if (getConfig().isStatusBasedReindexingDisabled()) {
return -1;
}
if (!myReindexLock.tryLock()) {
return -1;
}
try {
return doPerformReindexingPass(theCount);
} catch (ReindexFailureException e) {
ourLog.warn("Reindexing failed for resource {}", e.getResourceId());
markResourceAsIndexingFailed(e.getResourceId());
return -1;
} finally {
myReindexLock.unlock();
}
}
private class ResourceReindexingTask implements Runnable {
private final Long myNextId;
public ResourceReindexingTask(Long theNextId) {
myNextId = theNextId;
}
@SuppressWarnings("unchecked")
@Override
public void run() {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.afterPropertiesSet();
Throwable reindexFailure;
try {
reindexFailure = txTemplate.execute(new TransactionCallback<Throwable>() {
@Override
public Throwable doInTransaction(TransactionStatus theStatus) {
ResourceTable resourceTable = myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new);
try {
/*
* This part is because from HAPI 1.5 - 1.6 we changed the format of forced ID to be "type/id" instead of just "id"
*/
ForcedId forcedId = resourceTable.getForcedId();
if (forcedId != null) {
if (isBlank(forcedId.getResourceType())) {
ourLog.info("Updating resource {} forcedId type to {}", forcedId.getForcedId(), resourceTable.getResourceType());
forcedId.setResourceType(resourceTable.getResourceType());
myForcedIdDao.save(forcedId);
}
}
final IBaseResource resource = toResource(resourceTable, false);
Class<? extends IBaseResource> resourceClass = getContext().getResourceDefinition(resourceTable.getResourceType()).getImplementingClass();
@SuppressWarnings("rawtypes") final IFhirResourceDao dao = getDaoOrThrowException(resourceClass);
dao.reindex(resource, resourceTable);
return null;
} catch (Exception e) {
ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e.toString(), e);
theStatus.setRollbackOnly();
return e;
}
}
});
} catch (ResourceVersionConflictException e) {
/*
* We reindex in multiple threads, so it's technically possible that two threads try
* to index resources that cause a constraint error now (i.e. because a unique index has been
* added that didn't previously exist). In this case, one of the threads would succeed and
* not get this error, so we'll let the other one fail and try
* again later.
*/
ourLog.info("Failed to reindex {} because of a version conflict. Leaving in unindexed state: {}", e.getMessage());
reindexFailure = null;
}
if (reindexFailure != null) {
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
myResourceTableDao.updateStatusToErrored(myNextId);
}
});
}
}
}
}

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.dao;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -22,14 +22,17 @@ package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import org.apache.commons.lang3.Validate;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class DaoRegistry implements ApplicationContextAware {
private ApplicationContext myAppCtx;
@ -55,11 +58,23 @@ public class DaoRegistry implements ApplicationContextAware {
public IFhirResourceDao<?> getResourceDao(String theResourceName) {
IFhirResourceDao<?> retVal = getResourceNameToResourceDao().get(theResourceName);
Validate.notNull(retVal, "No DAO exists for resource type %s - Have: %s", theResourceName, myResourceNameToResourceDao);
if (retVal == null) {
List<String> supportedResourceTypes = getResourceNameToResourceDao()
.keySet()
.stream()
.sorted()
.collect(Collectors.toList());
throw new InvalidRequestException("Unable to process request, this server does not know how to handle resources of type " + theResourceName + " - Can handle: " + supportedResourceTypes);
}
return retVal;
}
public <T extends IBaseResource> IFhirResourceDao<T> getResourceDao(Class<T> theResourceType) {
String resourceName = myCtx.getResourceDefinition(theResourceType).getName();
return (IFhirResourceDao<T>) getResourceDao(resourceName);
}
private Map<String, IFhirResourceDao<?>> getResourceNameToResourceDao() {
Map<String, IFhirResourceDao<?>> retVal = myResourceNameToResourceDao;
if (retVal == null || retVal.isEmpty()) {

View File

@ -81,6 +81,8 @@ public class FhirSystemDaoDstu2 extends BaseHapiFhirSystemDao<Bundle, MetaDt> {
@Autowired
private PlatformTransactionManager myTxManager;
@Autowired
private DaoRegistry myDaoRegistry;
private Bundle batch(final RequestDetails theRequestDetails, Bundle theRequest) {
ourLog.info("Beginning batch with {} resources", theRequest.getEntry().size());
@ -363,7 +365,7 @@ public class FhirSystemDaoDstu2 extends BaseHapiFhirSystemDao<Bundle, MetaDt> {
case POST: {
// CREATE
@SuppressWarnings("rawtypes")
IFhirResourceDao resourceDao = getDaoOrThrowException(res.getClass());
IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(res.getClass());
res.setId((String) null);
DaoMethodOutcome outcome;
outcome = resourceDao.create(res, nextReqEntry.getRequest().getIfNoneExist(), false, theRequestDetails);
@ -403,7 +405,7 @@ public class FhirSystemDaoDstu2 extends BaseHapiFhirSystemDao<Bundle, MetaDt> {
case PUT: {
// UPDATE
@SuppressWarnings("rawtypes")
IFhirResourceDao resourceDao = getDaoOrThrowException(res.getClass());
IFhirResourceDao resourceDao = myDaoRegistry.getResourceDao(res.getClass());
DaoMethodOutcome outcome;

View File

@ -1,10 +1,5 @@
package ca.uhn.fhir.jpa.dao;
import java.util.Collection;
import java.util.Set;
import org.hl7.fhir.instance.model.api.IBaseResource;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.context.RuntimeSearchParam;
@ -13,6 +8,10 @@ import ca.uhn.fhir.jpa.entity.IBaseResourceEntity;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import ca.uhn.fhir.jpa.entity.ResourceTag;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import org.hl7.fhir.instance.model.api.IBaseResource;
import java.util.Collection;
import java.util.Set;
/*
* #%L

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.dao;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -53,13 +53,6 @@ public interface IFhirSystemDao<T, MT> extends IDao {
IBundleProvider history(Date theDate, Date theUntil, RequestDetails theRequestDetails);
/**
* Marks all indexes as needing fresh indexing
*
* @return Returns the number of affected rows
*/
int markAllResourcesForReindexing();
/**
* Not supported for DSTU1
*
@ -67,8 +60,6 @@ public interface IFhirSystemDao<T, MT> extends IDao {
*/
MT metaGetOperation(RequestDetails theRequestDetails);
Integer performReindexingPass(Integer theCount);
T transaction(RequestDetails theRequestDetails, T theResources);
}

View File

@ -43,7 +43,7 @@ public interface ISearchBuilder {
FhirContext theContext, IDao theDao);
Set<Long> loadIncludes(IDao theCallingDao, FhirContext theContext, EntityManager theEntityManager, Collection<Long> theMatches, Set<Include> theRevIncludes, boolean theReverseMode,
DateRangeParam theLastUpdated);
DateRangeParam theLastUpdated, String theSearchIdOrDescription);
/**
* How many results may be fetched at once

View File

@ -1958,7 +1958,7 @@ public class SearchBuilder implements ISearchBuilder {
*/
@Override
public HashSet<Long> loadIncludes(IDao theCallingDao, FhirContext theContext, EntityManager theEntityManager, Collection<Long> theMatches, Set<Include> theRevIncludes,
boolean theReverseMode, DateRangeParam theLastUpdated) {
boolean theReverseMode, DateRangeParam theLastUpdated, String theSearchIdOrDescription) {
if (theMatches.size() == 0) {
return new HashSet<>();
}
@ -2080,7 +2080,7 @@ public class SearchBuilder implements ISearchBuilder {
nextRoundMatches = pidsToInclude;
} while (includes.size() > 0 && nextRoundMatches.size() > 0 && addedSomeThisRound);
ourLog.info("Loaded {} {} in {} rounds and {} ms", allAdded.size(), theReverseMode ? "_revincludes" : "_includes", roundCounts, w.getMillisAndRestart());
ourLog.info("Loaded {} {} in {} rounds and {} ms for search {}", allAdded.size(), theReverseMode ? "_revincludes" : "_includes", roundCounts, w.getMillisAndRestart(), theSearchIdOrDescription);
return allAdded;
}
@ -2316,7 +2316,7 @@ public class SearchBuilder implements ISearchBuilder {
myCurrentOffset = end;
Collection<Long> pidsToScan = myCurrentPids.subList(start, end);
Set<Include> includes = Collections.singleton(new Include("*", true));
Set<Long> newPids = loadIncludes(myCallingDao, myContext, myEntityManager, pidsToScan, includes, false, myParams.getLastUpdated());
Set<Long> newPids = loadIncludes(myCallingDao, myContext, myEntityManager, pidsToScan, includes, false, myParams.getLastUpdated(), mySearchUuid);
myCurrentIterator = newPids.iterator();
}

View File

@ -83,6 +83,8 @@ public class TransactionProcessor<BUNDLE extends IBaseBundle, BUNDLEENTRY> {
private FhirContext myContext;
@Autowired
private ITransactionProcessorVersionAdapter<BUNDLE, BUNDLEENTRY> myVersionAdapter;
@Autowired
private DaoRegistry myDaoRegistry;
public static boolean isPlaceholder(IIdType theId) {
if (theId != null && theId.getValue() != null) {
@ -749,7 +751,7 @@ public class TransactionProcessor<BUNDLE extends IBaseBundle, BUNDLEENTRY> {
}
private IFhirResourceDao getDaoOrThrowException(Class<? extends IBaseResource> theClass) {
return myDao.getDaoOrThrowException(theClass);
return myDaoRegistry.getResourceDao(theClass);
}
protected void flushJpaSession() {

View File

@ -0,0 +1,58 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Collection;
import java.util.Date;
import java.util.List;
/*
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public interface IResourceReindexJobDao extends JpaRepository<ResourceReindexJobEntity, Long> {
@Modifying
@Query("UPDATE ResourceReindexJobEntity j SET j.myDeleted = true WHERE j.myResourceType = :type")
void markAllOfTypeAsDeleted(@Param("type") String theType);
@Modifying
@Query("UPDATE ResourceReindexJobEntity j SET j.myDeleted = true")
void markAllOfTypeAsDeleted();
@Modifying
@Query("UPDATE ResourceReindexJobEntity j SET j.myDeleted = true WHERE j.myId = :pid")
void markAsDeletedById(@Param("pid") Long theId);
@Query("SELECT j FROM ResourceReindexJobEntity j WHERE j.myDeleted = :deleted")
List<ResourceReindexJobEntity> findAll(Pageable thePage, @Param("deleted") boolean theDeleted);
@Modifying
@Query("UPDATE ResourceReindexJobEntity j SET j.mySuspendedUntil = :suspendedUntil")
void setSuspendedUntil(@Param("suspendedUntil") Date theSuspendedUntil);
@Modifying
@Query("UPDATE ResourceReindexJobEntity j SET j.myThresholdLow = :low WHERE j.myId = :id")
void setThresholdLow(@Param("id") Long theId, @Param("low") Date theLow);
}

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
@ -9,6 +8,7 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -21,9 +21,9 @@ import java.util.Map;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -43,17 +43,16 @@ public interface IResourceTableDao extends JpaRepository<ResourceTable, Long> {
@Query("SELECT t.myId FROM ResourceTable t WHERE t.myId = :resid AND t.myResourceType = :restype AND t.myDeleted IS NOT NULL")
Slice<Long> findIdsOfDeletedResourcesOfType(Pageable thePageable, @Param("resid") Long theResourceId, @Param("restype") String theResourceName);
@Query("SELECT t.myId FROM ResourceTable t WHERE t.myIndexStatus IS NULL")
Slice<Long> findIdsOfResourcesRequiringReindexing(Pageable thePageable);
@Query("SELECT t.myResourceType as type, COUNT(*) as count FROM ResourceTable t GROUP BY t.myResourceType")
@Query("SELECT t.myResourceType as type, COUNT(t.myResourceType) as count FROM ResourceTable t GROUP BY t.myResourceType")
List<Map<?, ?>> getResourceCounts();
@Modifying
@Query("UPDATE ResourceTable r SET r.myIndexStatus = null WHERE r.myResourceType = :restype")
int markResourcesOfTypeAsRequiringReindexing(@Param("restype") String theResourceType);
@Query("SELECT t.myId FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high ORDER BY t.myUpdated ASC")
Slice<Long> findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(Pageable thePage,@Param("low") Date theLow, @Param("high")Date theHigh);
@Query("SELECT t.myId FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high AND t.myResourceType = :restype ORDER BY t.myUpdated ASC")
Slice<Long> findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(Pageable thePage,@Param("restype") String theResourceType, @Param("low") Date theLow, @Param("high")Date theHigh);
@Modifying
@Query("UPDATE ResourceTable r SET r.myIndexStatus = " + BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED + " WHERE r.myId = :resid")
void updateStatusToErrored(@Param("resid") Long theId);
@Query("UPDATE ResourceTable t SET t.myIndexStatus = :status WHERE t.myId = :id")
void updateIndexStatus(@Param("id") Long theId, @Param("status") Long theIndexStatus);
}

View File

@ -20,11 +20,15 @@ package ca.uhn.fhir.jpa.dao.index;
* #L%
*/
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import javax.persistence.EntityManager;
import ca.uhn.fhir.jpa.entity.BaseHasResource;
import ca.uhn.fhir.jpa.entity.IBaseResourceEntity;
import ca.uhn.fhir.jpa.entity.ResourceTag;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
@ -50,4 +54,5 @@ public interface IndexingSupport {
public Long translateForcedIdToPid(String theResourceName, String theResourceId);
public String toResourceName(Class<? extends IBaseResource> theResourceType);
public IResourceIndexedCompositeStringUniqueDao getResourceIndexedCompositeStringUniqueDao();
}

View File

@ -0,0 +1,113 @@
package ca.uhn.fhir.jpa.entity;
/*
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import com.google.common.annotations.VisibleForTesting;
import javax.persistence.*;
import java.io.Serializable;
import java.util.Date;
@Entity
@Table(name = "HFJ_RES_REINDEX_JOB")
public class ResourceReindexJobEntity implements Serializable {
@Id
@SequenceGenerator(name = "SEQ_RES_REINDEX_JOB", sequenceName = "SEQ_RES_REINDEX_JOB")
@GeneratedValue(strategy = GenerationType.AUTO, generator = "SEQ_RES_REINDEX_JOB")
@Column(name = "PID")
private Long myId;
@Column(name = "RES_TYPE", nullable = true)
private String myResourceType;
/**
* Inclusive
*/
@Column(name = "UPDATE_THRESHOLD_HIGH", nullable = false)
@Temporal(TemporalType.TIMESTAMP)
private Date myThresholdHigh;
@Column(name = "JOB_DELETED", nullable = false)
private boolean myDeleted;
/**
* Inclusive
*/
@Column(name = "UPDATE_THRESHOLD_LOW", nullable = true)
@Temporal(TemporalType.TIMESTAMP)
private Date myThresholdLow;
@Column(name = "SUSPENDED_UNTIL", nullable = true)
@Temporal(TemporalType.TIMESTAMP)
private Date mySuspendedUntil;
public Date getSuspendedUntil() {
return mySuspendedUntil;
}
public void setSuspendedUntil(Date theSuspendedUntil) {
mySuspendedUntil = theSuspendedUntil;
}
/**
* Inclusive
*/
public Date getThresholdLow() {
return myThresholdLow;
}
/**
* Inclusive
*/
public void setThresholdLow(Date theThresholdLow) {
myThresholdLow = theThresholdLow;
}
public String getResourceType() {
return myResourceType;
}
public void setResourceType(String theResourceType) {
myResourceType = theResourceType;
}
/**
* Inclusive
*/
public Date getThresholdHigh() {
return myThresholdHigh;
}
/**
* Inclusive
*/
public void setThresholdHigh(Date theThresholdHigh) {
myThresholdHigh = theThresholdHigh;
}
public Long getId() {
return myId;
}
@VisibleForTesting
public void setIdForUnitTest(long theId) {
myId = theId;
}
public void setDeleted(boolean theDeleted) {
myDeleted = theDeleted;
}
}

View File

@ -564,9 +564,7 @@ public class ResourceTable extends BaseHasResource implements Serializable {
retVal.setPublished(getPublished());
retVal.setUpdated(getUpdated());
// retVal.setEncoding(getEncoding());
retVal.setFhirVersion(getFhirVersion());
// retVal.setResource(getResource());
retVal.setDeleted(getDeleted());
retVal.setForcedId(getForcedId());

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.provider;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -21,6 +21,7 @@ package ca.uhn.fhir.jpa.provider;
*/
import ca.uhn.fhir.jpa.dao.IFhirSystemDao;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
import ca.uhn.fhir.rest.annotation.At;
@ -31,6 +32,7 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.param.DateRangeParam;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.Parameters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import javax.servlet.http.HttpServletRequest;
@ -42,11 +44,17 @@ public class BaseJpaSystemProvider<T, MT> extends BaseJpaProvider {
public static final String PERFORM_REINDEXING_PASS = "$perform-reindexing-pass";
private IFhirSystemDao<T, MT> myDao;
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
public BaseJpaSystemProvider() {
// nothing
}
protected IResourceReindexingSvc getResourceReindexingSvc() {
return myResourceReindexingSvc;
}
protected Parameters doExpunge(IPrimitiveType<? extends Integer> theLimit, IPrimitiveType<? extends Boolean> theExpungeDeletedResources, IPrimitiveType<? extends Boolean> theExpungeOldVersions, IPrimitiveType<? extends Boolean> theExpungeEverything) {
ExpungeOptions options = createExpungeOptions(theLimit, theExpungeDeletedResources, theExpungeOldVersions, theExpungeEverything);
ExpungeOutcome outcome = getDao().expunge(options);

View File

@ -34,11 +34,11 @@ public abstract class BaseJpaSystemProviderDstu2Plus<T, MT> extends BaseJpaSyste
@OperationParam(name = "status")
})
public IBaseResource markAllResourcesForReindexing() {
Integer count = getDao().markAllResourcesForReindexing();
getResourceReindexingSvc().markAllResourcesForReindexing();
IBaseParameters retVal = ParametersUtil.newInstance(getContext());
IPrimitiveType<?> string = ParametersUtil.createString(getContext(), "Marked " + count + " resources");
IPrimitiveType<?> string = ParametersUtil.createString(getContext(), "Marked resources");
ParametersUtil.addParameterToParameters(getContext(), retVal, "status", string);
return retVal;
@ -48,7 +48,7 @@ public abstract class BaseJpaSystemProviderDstu2Plus<T, MT> extends BaseJpaSyste
@OperationParam(name = "status")
})
public IBaseResource performReindexingPass() {
Integer count = getDao().performReindexingPass(1000);
Integer count = getResourceReindexingSvc().runReindexingPass();
IBaseParameters retVal = ParametersUtil.newInstance(getContext());

View File

@ -276,8 +276,8 @@ public class PersistedJpaBundleProvider implements IBundleProvider {
protected List<IBaseResource> toResourceList(ISearchBuilder sb, List<Long> pidsSubList) {
Set<Long> includedPids = new HashSet<>();
if (mySearchEntity.getSearchType() == SearchTypeEnum.SEARCH) {
includedPids.addAll(sb.loadIncludes(myDao, myContext, myEntityManager, pidsSubList, mySearchEntity.toRevIncludesList(), true, mySearchEntity.getLastUpdated()));
includedPids.addAll(sb.loadIncludes(myDao, myContext, myEntityManager, pidsSubList, mySearchEntity.toIncludesList(), false, mySearchEntity.getLastUpdated()));
includedPids.addAll(sb.loadIncludes(myDao, myContext, myEntityManager, pidsSubList, mySearchEntity.toRevIncludesList(), true, mySearchEntity.getLastUpdated(), myUuid));
includedPids.addAll(sb.loadIncludes(myDao, myContext, myEntityManager, pidsSubList, mySearchEntity.toIncludesList(), false, mySearchEntity.getLastUpdated(), myUuid));
}
// Execute the query and make sure we return distinct results

View File

@ -313,8 +313,8 @@ public class SearchCoordinatorSvcImpl implements ISearchCoordinatorSvc {
* individually for pages as we return them to clients
*/
final Set<Long> includedPids = new HashSet<>();
includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated()));
includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated()));
includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getRevIncludes(), true, theParams.getLastUpdated(), "(synchronous)"));
includedPids.addAll(sb.loadIncludes(theCallingDao, myContext, myEntityManager, pids, theParams.getIncludes(), false, theParams.getLastUpdated(), "(synchronous)"));
List<IBaseResource> resources = new ArrayList<>();
sb.loadResourcesByPid(pids, resources, includedPids, false, myEntityManager, myContext, theCallingDao);

View File

@ -0,0 +1,34 @@
package ca.uhn.fhir.jpa.search.reindex;
public interface IResourceReindexingSvc {
/**
* Marks all indexes as needing fresh indexing
*/
void markAllResourcesForReindexing();
/**
* Marks all indexes of the given type as needing fresh indexing
*/
void markAllResourcesForReindexing(String theType);
/**
* Called automatically by the job scheduler
*
* @return Returns null if the system did not attempt to perform a pass because one was
* already proceeding. Otherwise, returns the number of resources affected.
*/
Integer runReindexingPass();
/**
* Does the same thing as {@link #runReindexingPass()} but makes sure to perform at
* least one pass even if one is half finished
*/
Integer forceReindexingPass();
/**
* Cancels all running and future reindexing jobs. This is mainly intended
* to be used by unit tests.
*/
void cancelAndPurgeAllJobs();
}

View File

@ -0,0 +1,450 @@
package ca.uhn.fhir.jpa.search.reindex;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.entity.ForcedId;
import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.util.StopWatch;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.time.DateUtils;
import org.hibernate.search.util.impl.Executors;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.PostConstruct;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.PersistenceContextType;
import javax.persistence.Query;
import javax.transaction.Transactional;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class ResourceReindexingSvcImpl implements IResourceReindexingSvc {
private static final Date BEGINNING_OF_TIME = new Date(0);
private static final Logger ourLog = LoggerFactory.getLogger(ResourceReindexingSvcImpl.class);
private final ReentrantLock myIndexingLock = new ReentrantLock();
@Autowired
private IResourceReindexJobDao myReindexJobDao;
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private PlatformTransactionManager myTxManager;
private TransactionTemplate myTxTemplate;
private ThreadFactory myReindexingThreadFactory = new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
private ThreadPoolExecutor myTaskExecutor;
@Autowired
private IResourceTableDao myResourceTableDao;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private IForcedIdDao myForcedIdDao;
@Autowired
private FhirContext myContext;
@PersistenceContext(type = PersistenceContextType.TRANSACTION)
private EntityManager myEntityManager;
@VisibleForTesting
void setReindexJobDaoForUnitTest(IResourceReindexJobDao theReindexJobDao) {
myReindexJobDao = theReindexJobDao;
}
@VisibleForTesting
void setDaoConfigForUnitTest(DaoConfig theDaoConfig) {
myDaoConfig = theDaoConfig;
}
@VisibleForTesting
void setTxManagerForUnitTest(PlatformTransactionManager theTxManager) {
myTxManager = theTxManager;
}
@VisibleForTesting
void setResourceTableDaoForUnitTest(IResourceTableDao theResourceTableDao) {
myResourceTableDao = theResourceTableDao;
}
@VisibleForTesting
void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) {
myDaoRegistry = theDaoRegistry;
}
@VisibleForTesting
void setForcedIdDaoForUnitTest(IForcedIdDao theForcedIdDao) {
myForcedIdDao = theForcedIdDao;
}
@VisibleForTesting
void setContextForUnitTest(FhirContext theContext) {
myContext = theContext;
}
@PostConstruct
public void start() {
myTxTemplate = new TransactionTemplate(myTxManager);
initExecutor();
}
private void initExecutor() {
// Create the threadpool executor used for reindex jobs
int reindexThreadCount = myDaoConfig.getReindexThreadCount();
RejectedExecutionHandler rejectHandler = new Executors.BlockPolicy();
myTaskExecutor = new ThreadPoolExecutor(0, reindexThreadCount,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100),
myReindexingThreadFactory,
rejectHandler
);
}
@Override
@Transactional(Transactional.TxType.REQUIRED)
public void markAllResourcesForReindexing() {
markAllResourcesForReindexing(null);
}
@Override
@Transactional(Transactional.TxType.REQUIRED)
public void markAllResourcesForReindexing(String theType) {
String typeDesc;
if (isNotBlank(theType)) {
myReindexJobDao.markAllOfTypeAsDeleted(theType);
typeDesc = theType;
} else {
myReindexJobDao.markAllOfTypeAsDeleted();
typeDesc = "(any)";
}
ResourceReindexJobEntity job = new ResourceReindexJobEntity();
job.setResourceType(theType);
job.setThresholdHigh(DateUtils.addMinutes(new Date(), 5));
job = myReindexJobDao.saveAndFlush(job);
ourLog.info("Marking all resources of type {} for reindexing - Got job ID[{}]", typeDesc, job.getId());
}
@Override
@Transactional(Transactional.TxType.NEVER)
@Scheduled(fixedDelay = 10 * DateUtils.MILLIS_PER_SECOND)
public Integer runReindexingPass() {
if (myIndexingLock.tryLock()) {
try {
return doReindexingPassInsideLock();
} finally {
myIndexingLock.unlock();
}
}
return null;
}
private Integer doReindexingPassInsideLock() {
expungeJobsMarkedAsDeleted();
return runReindexJobs();
}
@Override
public Integer forceReindexingPass() {
myIndexingLock.lock();
try {
return doReindexingPassInsideLock();
} finally {
myIndexingLock.unlock();
}
}
@Override
public void cancelAndPurgeAllJobs() {
ourLog.info("Cancelling and purging all resource reindexing jobs");
myTxTemplate.execute(t -> {
myReindexJobDao.markAllOfTypeAsDeleted();
return null;
});
myTaskExecutor.shutdown();
initExecutor();
expungeJobsMarkedAsDeleted();
}
private Integer runReindexJobs() {
Collection<ResourceReindexJobEntity> jobs = myTxTemplate.execute(t -> myReindexJobDao.findAll(PageRequest.of(0, 10), false));
assert jobs != null;
int count = 0;
for (ResourceReindexJobEntity next : jobs) {
if (next.getThresholdHigh().getTime() < System.currentTimeMillis()) {
markJobAsDeleted(next);
continue;
}
count += runReindexJob(next);
}
return count;
}
private void markJobAsDeleted(ResourceReindexJobEntity next) {
myTxTemplate.execute(t -> {
myReindexJobDao.markAsDeletedById(next.getId());
return null;
});
}
private int runReindexJob(ResourceReindexJobEntity theJob) {
if (theJob.getSuspendedUntil() != null) {
if (theJob.getSuspendedUntil().getTime() > System.currentTimeMillis()) {
return 0;
}
}
ourLog.info("Performing reindex pass for JOB[{}]", theJob.getId());
StopWatch sw = new StopWatch();
AtomicInteger counter = new AtomicInteger();
// Calculate range
Date low = theJob.getThresholdLow() != null ? theJob.getThresholdLow() : BEGINNING_OF_TIME;
Date high = theJob.getThresholdHigh();
// Query for resources within threshold
Slice<Long> range = myTxTemplate.execute(t -> {
PageRequest page = PageRequest.of(0, 10000);
if (isNotBlank(theJob.getResourceType())) {
return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, theJob.getResourceType(), low, high);
} else {
return myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(page, low, high);
}
});
Validate.notNull(range);
int count = range.getNumberOfElements();
// Submit each resource requiring reindexing
List<Future<Date>> futures = range
.stream()
.map(t -> myTaskExecutor.submit(new ResourceReindexingTask(t, counter)))
.collect(Collectors.toList());
Date latestDate = null;
boolean haveMultipleDates = false;
for (Future<Date> next : futures) {
Date nextDate;
try {
nextDate = next.get();
} catch (Exception e) {
ourLog.error("Failure reindexing", e);
Date suspendedUntil = DateUtils.addMinutes(new Date(), 1);
myTxTemplate.execute(t -> {
myReindexJobDao.setSuspendedUntil(suspendedUntil);
return null;
});
return counter.get();
}
if (nextDate != null) {
if (latestDate != null) {
if (latestDate.getTime() != nextDate.getTime()) {
haveMultipleDates = true;
}
}
if (latestDate == null || latestDate.getTime() < nextDate.getTime()) {
latestDate = new Date(nextDate.getTime());
}
}
}
// Just in case we end up in some sort of infinite loop. This shouldn't happen, and couldn't really
// happen unless there were 10000 resources with the exact same update time down to the
// millisecond.
Date newLow;
if (latestDate == null) {
markJobAsDeleted(theJob);
return 0;
}
if (latestDate.getTime() == low.getTime()) {
ourLog.error("Final pass time for reindex JOB[{}] has same ending low value: {}", theJob.getId(), latestDate);
newLow = new Date(latestDate.getTime() + 1);
} else if (!haveMultipleDates) {
newLow = new Date(latestDate.getTime() + 1);
} else {
newLow = latestDate;
}
myTxTemplate.execute(t -> {
myReindexJobDao.setThresholdLow(theJob.getId(), newLow);
return null;
});
ourLog.info("Completed pass of reindex JOB[{}] - Indexed {} resources in {} ({} / sec) - Have indexed until: {}", theJob.getId(), count, sw.toString(), sw.formatThroughput(count, TimeUnit.SECONDS), theJob.getThresholdLow());
return counter.get();
}
private void expungeJobsMarkedAsDeleted() {
myTxTemplate.execute(t -> {
Collection<ResourceReindexJobEntity> toDelete = myReindexJobDao.findAll(PageRequest.of(0, 10), true);
toDelete.forEach(job -> {
ourLog.info("Purging deleted job[{}]", job.getId());
myReindexJobDao.deleteById(job.getId());
});
return null;
});
}
@SuppressWarnings("JpaQlInspection")
private void markResourceAsIndexingFailed(final long theId) {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.execute((TransactionCallback<Void>) theStatus -> {
ourLog.info("Marking resource with PID {} as indexing_failed", new Object[]{theId});
myResourceTableDao.updateIndexStatus(theId, BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED);
Query q = myEntityManager.createQuery("DELETE FROM ResourceTag t WHERE t.myResourceId = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamCoords t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamDate t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamNumber t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamQuantity t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamString t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamToken t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceIndexedSearchParamUri t WHERE t.myResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.mySourceResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
q = myEntityManager.createQuery("DELETE FROM ResourceLink t WHERE t.myTargetResourcePid = :id");
q.setParameter("id", theId);
q.executeUpdate();
return null;
});
}
private class ResourceReindexingTask implements Callable<Date> {
private final Long myNextId;
private final AtomicInteger myCounter;
private Date myUpdated;
ResourceReindexingTask(Long theNextId, AtomicInteger theCounter) {
myNextId = theNextId;
myCounter = theCounter;
}
@SuppressWarnings("unchecked")
private <T extends IBaseResource> void doReindex(ResourceTable theResourceTable, T theResource) {
RuntimeResourceDefinition resourceDefinition = myContext.getResourceDefinition(theResource.getClass());
Class<T> resourceClass = (Class<T>) resourceDefinition.getImplementingClass();
final IFhirResourceDao<T> dao = myDaoRegistry.getResourceDao(resourceClass);
dao.reindex(theResource, theResourceTable);
myCounter.incrementAndGet();
}
@Override
public Date call() {
Throwable reindexFailure;
try {
reindexFailure = myTxTemplate.execute(t -> {
ResourceTable resourceTable = myResourceTableDao.findById(myNextId).orElseThrow(IllegalStateException::new);
myUpdated = resourceTable.getUpdatedDate();
try {
/*
* This part is because from HAPI 1.5 - 1.6 we changed the format of forced ID to be "type/id" instead of just "id"
*/
ForcedId forcedId = resourceTable.getForcedId();
if (forcedId != null) {
if (isBlank(forcedId.getResourceType())) {
ourLog.info("Updating resource {} forcedId type to {}", forcedId.getForcedId(), resourceTable.getResourceType());
forcedId.setResourceType(resourceTable.getResourceType());
myForcedIdDao.save(forcedId);
}
}
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceTable.getResourceType());
IBaseResource resource = dao.toResource(resourceTable, false);
if (resource == null) {
throw new InternalErrorException("Could not find resource version " + resourceTable.getIdDt().toUnqualified().getValue() + " in database");
}
doReindex(resourceTable, resource);
return null;
} catch (Exception e) {
ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e.toString(), e);
t.setRollbackOnly();
return e;
}
});
} catch (ResourceVersionConflictException e) {
/*
* We reindex in multiple threads, so it's technically possible that two threads try
* to index resources that cause a constraint error now (i.e. because a unique index has been
* added that didn't previously exist). In this case, one of the threads would succeed and
* not get this error, so we'll let the other one fail and try
* again later.
*/
ourLog.info("Failed to reindex {} because of a version conflict. Leaving in unindexed state: {}", e.getMessage());
reindexFailure = null;
}
if (reindexFailure != null) {
ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
markResourceAsIndexingFailed(myNextId);
}
return myUpdated;
}
}
}

View File

@ -1,34 +0,0 @@
package ca.uhn.fhir.jpa.util;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public interface IReindexController {
/**
* This method is called automatically by the scheduler
*/
void performReindexingPass();
/**
* This method requests that the reindex process happen as soon as possible
*/
void requestReindex();
}

View File

@ -1,119 +0,0 @@
package ca.uhn.fhir.jpa.util;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2018 University Health Network
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.IFhirSystemDao;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.Semaphore;
public class ReindexController implements IReindexController {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexController.class);
private final Semaphore myReindexingLock = new Semaphore(1);
@Autowired
private DaoConfig myDaoConfig;
@Autowired
private IFhirSystemDao<?, ?> mySystemDao;
private Long myDontReindexUntil;
/**
* This method is called once per minute to perform any required re-indexing.
* <p>
* If nothing if found that requires reindexing, the query will not fire again for
* a longer amount of time.
* <p>
* During most passes this will just check and find that there are no resources
* requiring re-indexing. In that case the method just returns immediately.
* If the search finds that some resources require reindexing, the system will
* do a bunch of reindexing and then return.
*/
@Scheduled(fixedDelay = DateUtils.MILLIS_PER_MINUTE)
@Transactional(propagation = Propagation.NEVER)
@Override
public void performReindexingPass() {
if (myDaoConfig.isSchedulingDisabled() || myDaoConfig.isStatusBasedReindexingDisabled()) {
return;
}
synchronized (this) {
if (myDontReindexUntil != null && myDontReindexUntil > System.currentTimeMillis()) {
return;
}
}
if (!myReindexingLock.tryAcquire()) {
ourLog.trace("Not going to reindex in parallel threads");
return;
}
Integer count;
try {
count = mySystemDao.performReindexingPass(100);
for (int i = 0; i < 50 && count != null && count != 0; i++) {
count = mySystemDao.performReindexingPass(100);
try {
Thread.sleep(DateUtils.MILLIS_PER_SECOND);
} catch (InterruptedException e) {
break;
}
}
} catch (Exception e) {
ourLog.error("Failure during reindex", e);
count = -1;
} finally {
myReindexingLock.release();
}
synchronized (this) {
if (count == null) {
ourLog.info("Reindex pass complete, no remaining resource to index");
myDontReindexUntil = System.currentTimeMillis() + DateUtils.MILLIS_PER_HOUR;
} else if (count == -1) {
// Reindexing failed
myDontReindexUntil = System.currentTimeMillis() + DateUtils.MILLIS_PER_HOUR;
} else {
ourLog.info("Reindex pass complete, {} remaining resource to index", count);
myDontReindexUntil = null;
}
}
}
/**
* Calling this will cause a reindex loop to be triggered sooner that it would otherwise
*/
@Override
public void requestReindex() {
synchronized (this) {
myDontReindexUntil = null;
}
}
}

View File

@ -4,7 +4,6 @@ import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.rest.server.interceptor.RequestValidatingInterceptor;
import ca.uhn.fhir.validation.ResultSeverityEnum;
import net.ttddyy.dsproxy.listener.SingleQueryCountHolder;
import net.ttddyy.dsproxy.listener.ThreadQueryCountHolder;
import net.ttddyy.dsproxy.listener.logging.SLF4JLogLevel;
import net.ttddyy.dsproxy.support.ProxyDataSourceBuilder;
import org.apache.commons.dbcp2.BasicDataSource;

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.term.VersionIndependentConcept;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
@ -33,7 +34,9 @@ import org.hl7.fhir.instance.model.api.IBaseBundle;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.*;
import org.mockito.Mockito;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
@ -75,6 +78,7 @@ public abstract class BaseJpaTest {
@Rule
public LoggingRule myLoggingRule = new LoggingRule();
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
protected ServletRequestDetails mySrd;
protected ArrayList<IServerInterceptor> myServerInterceptorList;
protected IRequestOperationCallback myRequestOperationCallback = mock(IRequestOperationCallback.class);
@ -89,7 +93,7 @@ public abstract class BaseJpaTest {
@After
public void afterValidateNoTransaction() {
PlatformTransactionManager txManager = getTxManager();
if (txManager != null) {
if (txManager instanceof JpaTransactionManager) {
JpaTransactionManager hibernateTxManager = (JpaTransactionManager) txManager;
SessionFactory sessionFactory = (SessionFactory) hibernateTxManager.getEntityManagerFactory();
AtomicBoolean isReadOnly = new AtomicBoolean();
@ -114,8 +118,9 @@ public abstract class BaseJpaTest {
}
@Before
public void beforeCreateSrd() {
mySrd = mock(ServletRequestDetails.class, Mockito.RETURNS_DEEP_STUBS);
public void beforeInitMocks() {
MockitoAnnotations.initMocks(this);
when(mySrd.getRequestOperationCallback()).thenReturn(myRequestOperationCallback);
myServerInterceptorList = new ArrayList<>();
when(mySrd.getServer().getInterceptors()).thenReturn(myServerInterceptorList);
@ -355,8 +360,9 @@ public abstract class BaseJpaTest {
return bundleStr;
}
public static void purgeDatabase(DaoConfig theDaoConfig, IFhirSystemDao<?, ?> theSystemDao, ISearchParamPresenceSvc theSearchParamPresenceSvc, ISearchCoordinatorSvc theSearchCoordinatorSvc, ISearchParamRegistry theSearchParamRegistry) {
public static void purgeDatabase(DaoConfig theDaoConfig, IFhirSystemDao<?, ?> theSystemDao, IResourceReindexingSvc theResourceReindexingSvc, ISearchCoordinatorSvc theSearchCoordinatorSvc, ISearchParamRegistry theSearchParamRegistry) {
theSearchCoordinatorSvc.cancelAllActiveSearches();
theResourceReindexingSvc.cancelAndPurgeAllJobs();
boolean expungeEnabled = theDaoConfig.isExpungeEnabled();
theDaoConfig.setExpungeEnabled(true);

View File

@ -12,6 +12,7 @@ import ca.uhn.fhir.jpa.entity.ResourceTable;
import ca.uhn.fhir.jpa.provider.JpaSystemProviderDstu2;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.util.ResourceCountCache;
import ca.uhn.fhir.model.dstu2.composite.CodeableConceptDt;
@ -42,7 +43,7 @@ import javax.persistence.EntityManager;
import java.io.IOException;
import java.io.InputStream;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@RunWith(SpringJUnit4ClassRunner.class)
@ -56,6 +57,8 @@ public abstract class BaseJpaDstu2Test extends BaseJpaTest {
@Autowired
protected ApplicationContext myAppCtx;
@Autowired
protected IResourceReindexingSvc myResourceReindexingSvc;
@Autowired
@Qualifier("myAppointmentDaoDstu2")
protected IFhirResourceDao<Appointment> myAppointmentDao;
@Autowired
@ -197,7 +200,7 @@ public abstract class BaseJpaDstu2Test extends BaseJpaTest {
@Before
@Transactional()
public void beforePurgeDatabase() throws InterruptedException {
purgeDatabase(myDaoConfig, mySystemDao, mySearchParamPresenceSvc, mySearchCoordinatorSvc, mySearchParamRegistry);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry);
}
@Before

View File

@ -23,7 +23,7 @@ import org.mockito.stubbing.Answer;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
public class FhirResourceDaoDstu2InterceptorTest extends BaseJpaDstu2Test {

View File

@ -988,7 +988,9 @@ public class FhirResourceDaoDstu2SearchCustomSearchParamTest extends BaseJpaDstu
mySearchParameterDao.delete(spId, mySrd);
mySearchParamRegsitry.forceRefresh();
mySystemDao.performReindexingPass(100);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
// Try with custom gender SP
map = new SearchParameterMap();

View File

@ -3,7 +3,7 @@ package ca.uhn.fhir.jpa.dao.dstu2;
import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.mockito.Matchers.eq;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import java.io.IOException;

View File

@ -12,6 +12,7 @@ import ca.uhn.fhir.jpa.provider.dstu3.JpaSystemProviderDstu3;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.term.BaseHapiTerminologySvcImpl;
import ca.uhn.fhir.jpa.term.IHapiTerminologySvc;
@ -67,6 +68,10 @@ public abstract class BaseJpaDstu3Test extends BaseJpaTest {
@Qualifier("myResourceCountsCache")
protected ResourceCountCache myResourceCountsCache;
@Autowired
protected IResourceReindexingSvc myResourceReindexingSvc;
@Autowired
protected IResourceReindexJobDao myResourceReindexJobDao;
@Autowired
@Qualifier("myCoverageDaoDstu3")
protected IFhirResourceDao<Coverage> myCoverageDao;
@Autowired
@ -294,8 +299,8 @@ public abstract class BaseJpaDstu3Test extends BaseJpaTest {
@Before
@Transactional()
public void beforePurgeDatabase() throws InterruptedException {
purgeDatabase(myDaoConfig, mySystemDao, mySearchParamPresenceSvc, mySearchCoordinatorSvc, mySearchParamRegsitry);
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegsitry);
}
@Before

View File

@ -30,10 +30,9 @@ public class FhirResourceDaoDstu3CodeSystemTest extends BaseJpaDstu3Test {
CodeSystem cs = myFhirCtx.newJsonParser().parseResource(CodeSystem.class, input);
myCodeSystemDao.create(cs, mySrd);
mySystemDao.markAllResourcesForReindexing();
int outcome = mySystemDao.performReindexingPass(100);
myResourceReindexingSvc.markAllResourcesForReindexing();
int outcome= myResourceReindexingSvc.forceReindexingPass();
assertNotEquals(-1, outcome); // -1 means there was a failure
myTermSvc.saveDeferred();

View File

@ -24,7 +24,7 @@ import org.mockito.stubbing.Answer;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
public class FhirResourceDaoDstu3InterceptorTest extends BaseJpaDstu3Test {

View File

@ -994,7 +994,8 @@ public class FhirResourceDaoDstu3SearchCustomSearchParamTest extends BaseJpaDstu
mySearchParameterDao.delete(spId, mySrd);
mySearchParamRegsitry.forceRefresh();
mySystemDao.performReindexingPass(100);
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
// Try with custom gender SP
map = new SearchParameterMap();

View File

@ -5,6 +5,7 @@ import ca.uhn.fhir.jpa.config.TestDstu3WithoutLuceneConfig;
import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.provider.dstu3.JpaSystemProviderDstu3;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
@ -144,11 +145,13 @@ public class FhirResourceDaoDstu3SearchWithLuceneDisabledTest extends BaseJpaTes
@Autowired
@Qualifier("myJpaValidationSupportChainDstu3")
private IValidationSupport myValidationSupport;
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
@Before
public void beforePurgeDatabase() {
runInTransaction(() -> {
purgeDatabase(myDaoConfig, mySystemDao, mySearchParamPresenceSvc, mySearchCoordinatorSvc, mySearchParamRegistry);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry);
});
}

View File

@ -487,10 +487,10 @@ public class FhirResourceDaoDstu3TerminologyTest extends BaseJpaDstu3Test {
createExternalCsAndLocalVs();
mySystemDao.markAllResourcesForReindexing();
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
mySystemDao.performReindexingPass(100);
mySystemDao.performReindexingPass(100);
myHapiTerminologySvc.saveDeferred();
myHapiTerminologySvc.saveDeferred();
myHapiTerminologySvc.saveDeferred();
@ -729,17 +729,17 @@ public class FhirResourceDaoDstu3TerminologyTest extends BaseJpaDstu3Test {
include.setSystem(URL_MY_CODE_SYSTEM);
include.addConcept().setCode("ZZZZ");
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(null);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
myTermSvc.saveDeferred();
mySystemDao.performReindexingPass(null);
myTermSvc.saveDeferred();
// Again
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(null);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
myTermSvc.saveDeferred();
mySystemDao.performReindexingPass(null);
myTermSvc.saveDeferred();
}

View File

@ -44,7 +44,7 @@ import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.mockito.Matchers.eq;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
@SuppressWarnings({"unchecked", "deprecation"})

View File

@ -222,8 +222,9 @@ public class FhirResourceDaoDstu3UniqueSearchParamTest extends BaseJpaDstu3Test
createUniqueIndexCoverageBeneficiary();
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(1000);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
List<ResourceIndexedCompositeStringUnique> uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(uniques.toString(), 1, uniques.size());

View File

@ -11,6 +11,7 @@ import ca.uhn.fhir.jpa.provider.r4.JpaSystemProviderR4;
import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.search.warm.ICacheWarmingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.jpa.term.BaseHapiTerminologySvcImpl;
@ -213,6 +214,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest {
@Autowired
protected ISearchIncludeDao mySearchIncludeDao;
@Autowired
protected IResourceReindexJobDao myResourceReindexJobDao;
@Autowired
@Qualifier("mySearchParameterDaoR4")
protected IFhirResourceDao<SearchParameter> mySearchParameterDao;
@Autowired
@ -237,6 +240,8 @@ public abstract class BaseJpaR4Test extends BaseJpaTest {
@Qualifier("mySystemDaoR4")
protected IFhirSystemDao<Bundle, Meta> mySystemDao;
@Autowired
protected IResourceReindexingSvc myResourceReindexingSvc;
@Autowired
@Qualifier("mySystemProviderR4")
protected JpaSystemProviderR4 mySystemProvider;
@Autowired
@ -314,7 +319,7 @@ public abstract class BaseJpaR4Test extends BaseJpaTest {
@Transactional()
public void beforePurgeDatabase() throws InterruptedException {
final EntityManager entityManager = this.myEntityManager;
purgeDatabase(myDaoConfig, mySystemDao, mySearchParamPresenceSvc, mySearchCoordinatorSvc, mySearchParamRegsitry);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegsitry);
}
@Before

View File

@ -30,10 +30,8 @@ public class FhirResourceDaoR4CodeSystemTest extends BaseJpaR4Test {
CodeSystem cs = myFhirCtx.newJsonParser().parseResource(CodeSystem.class, input);
myCodeSystemDao.create(cs, mySrd);
mySystemDao.markAllResourcesForReindexing();
int outcome = mySystemDao.performReindexingPass(100);
myResourceReindexingSvc.markAllResourcesForReindexing();
int outcome = myResourceReindexingSvc.runReindexingPass();
assertNotEquals(-1, outcome); // -1 means there was a failure
myTermSvc.saveDeferred();

View File

@ -5,10 +5,7 @@ import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
@ -83,6 +80,8 @@ public class FhirResourceDaoR4CreateTest extends BaseJpaR4Test {
Patient p = new Patient();
p.setId(IdType.newRandomUuid());
p.addName().setFamily("FAM");
p.setActive(true);
p.setBirthDateElement(new DateType("2011-01-01"));
p.getManagingOrganization().setReference(org.getId());
Bundle input = new Bundle();

View File

@ -146,8 +146,8 @@ public class FhirResourceDaoR4SearchCustomSearchParamTest extends BaseJpaR4Test
mySearchParameterDao.create(fooSp, mySrd);
assertEquals(1, mySystemDao.performReindexingPass(100).intValue());
assertEquals(0, mySystemDao.performReindexingPass(100).intValue());
assertEquals(1, myResourceReindexingSvc.forceReindexingPass().intValue());
assertEquals(0, myResourceReindexingSvc.forceReindexingPass().intValue());
}
@ -1171,7 +1171,7 @@ public class FhirResourceDaoR4SearchCustomSearchParamTest extends BaseJpaR4Test
mySearchParameterDao.delete(spId, mySrd);
mySearchParamRegsitry.forceRefresh();
mySystemDao.performReindexingPass(100);
myResourceReindexingSvc.forceReindexingPass();
// Try with custom gender SP
map = new SearchParameterMap();

View File

@ -4,6 +4,7 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.config.TestR4WithoutLuceneConfig;
import ca.uhn.fhir.jpa.dao.*;
import ca.uhn.fhir.jpa.search.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.sp.ISearchParamPresenceSvc;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
@ -89,11 +90,13 @@ public class FhirResourceDaoR4SearchWithLuceneDisabledTest extends BaseJpaTest {
private IValidationSupport myValidationSupport;
@Autowired
private IFhirSystemDao<Bundle, Meta> mySystemDao;
@Autowired
private IResourceReindexingSvc myResourceReindexingSvc;
@Before
@Transactional()
public void beforePurgeDatabase() {
purgeDatabase(myDaoConfig, mySystemDao, mySearchParamPresenceSvc, mySearchCoordinatorSvc, mySearchParamRegistry);
purgeDatabase(myDaoConfig, mySystemDao, myResourceReindexingSvc, mySearchCoordinatorSvc, mySearchParamRegistry);
}
@Before

View File

@ -539,10 +539,9 @@ public class FhirResourceDaoR4TerminologyTest extends BaseJpaR4Test {
createExternalCsAndLocalVs();
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(100);
mySystemDao.performReindexingPass(100);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
myHapiTerminologySvc.saveDeferred();
myHapiTerminologySvc.saveDeferred();
myHapiTerminologySvc.saveDeferred();
@ -851,17 +850,17 @@ public class FhirResourceDaoR4TerminologyTest extends BaseJpaR4Test {
include.setSystem(URL_MY_CODE_SYSTEM);
include.addConcept().setCode("ZZZZ");
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(null);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
myTermSvc.saveDeferred();
mySystemDao.performReindexingPass(null);
myTermSvc.saveDeferred();
// Again
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(null);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
myTermSvc.saveDeferred();
mySystemDao.performReindexingPass(null);
myTermSvc.saveDeferred();
}

View File

@ -53,7 +53,7 @@ import static org.apache.commons.lang3.StringUtils.defaultString;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.mockito.Matchers.eq;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
@SuppressWarnings({"unchecked", "deprecation", "Duplicates"})
@ -162,6 +162,9 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
runInTransaction(() -> {
assertThat(myResourceIndexedSearchParamTokenDao.countForResourceId(id1.getIdPartAsLong()), greaterThan(0));
Optional<ResourceTable> tableOpt = myResourceTableDao.findById(id1.getIdPartAsLong());
assertTrue(tableOpt.isPresent());
assertEquals(BaseHapiFhirDao.INDEX_STATUS_INDEXED, tableOpt.get().getIndexStatus().longValue());
});
runInTransaction(() -> {
@ -170,10 +173,16 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
ResourceTable table = tableOpt.get();
table.setIndexStatus(null);
table.setDeleted(new Date());
table = myResourceTableDao.saveAndFlush(table);
ResourceHistoryTable newHistory = table.toHistory();
ResourceHistoryTable currentHistory = myResourceHistoryTableDao.findForIdAndVersion(table.getId(), 1L);
newHistory.setEncoding(currentHistory.getEncoding());
newHistory.setResource(currentHistory.getResource());
myResourceHistoryTableDao.save(newHistory);
});
mySystemDao.performReindexingPass(1000);
mySystemDao.performReindexingPass(1000);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.runReindexingPass();
runInTransaction(() -> {
Optional<ResourceTable> tableOpt = myResourceTableDao.findById(id1.getIdPartAsLong());
@ -185,6 +194,48 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
}
@Test
public void testMissingVersionsAreReindexed() {
myDaoConfig.setSchedulingDisabled(true);
Patient pt1 = new Patient();
pt1.setActive(true);
pt1.addName().setFamily("FAM");
IIdType id1 = myPatientDao.create(pt1).getId().toUnqualifiedVersionless();
runInTransaction(() -> {
assertThat(myResourceIndexedSearchParamTokenDao.countForResourceId(id1.getIdPartAsLong()), greaterThan(0));
Optional<ResourceTable> tableOpt = myResourceTableDao.findById(id1.getIdPartAsLong());
assertTrue(tableOpt.isPresent());
assertEquals(BaseHapiFhirDao.INDEX_STATUS_INDEXED, tableOpt.get().getIndexStatus().longValue());
});
/*
* This triggers a new version in the HFJ_RESOURCE table, but
* we do not create the corresponding entry in the HFJ_RES_VER
* table.
*/
runInTransaction(() -> {
Optional<ResourceTable> tableOpt = myResourceTableDao.findById(id1.getIdPartAsLong());
assertTrue(tableOpt.isPresent());
ResourceTable table = tableOpt.get();
table.setIndexStatus(null);
table.setDeleted(new Date());
myResourceTableDao.saveAndFlush(table);
});
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.runReindexingPass();
runInTransaction(() -> {
Optional<ResourceTable> tableOpt = myResourceTableDao.findById(id1.getIdPartAsLong());
assertTrue(tableOpt.isPresent());
assertEquals(BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED, tableOpt.get().getIndexStatus().longValue());
assertThat(myResourceIndexedSearchParamTokenDao.countForResourceId(id1.getIdPartAsLong()), not(greaterThan(0)));
});
}
@Test
public void testCantSearchForDeletedResourceByLanguageOrTag() {

View File

@ -449,9 +449,9 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
createUniqueObservationSubjectDateCode();
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(1000);
mySystemDao.performReindexingPass(1000);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
List<ResourceIndexedCompositeStringUnique> uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(uniques.toString(), 1, uniques.size());
@ -462,9 +462,9 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
assertEquals(1, mySearchParamRegsitry.getActiveUniqueSearchParams("Observation").size());
assertEquals(7, mySystemDao.markAllResourcesForReindexing());
mySystemDao.performReindexingPass(1000);
mySystemDao.performReindexingPass(1000);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(uniques.toString(), 1, uniques.size());
@ -557,8 +557,9 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
createUniqueIndexCoverageBeneficiary();
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(1000);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
List<ResourceIndexedCompositeStringUnique> uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(uniques.toString(), 1, uniques.size());
@ -1119,8 +1120,9 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
pt2.setActive(false);
myPatientDao.create(pt1).getId().toUnqualifiedVersionless();
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(1000);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
List<ResourceIndexedCompositeStringUnique> uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(uniques.toString(), 1, uniques.size());
@ -1129,8 +1131,9 @@ public class FhirResourceDaoR4UniqueSearchParamTest extends BaseJpaR4Test {
myResourceIndexedCompositeStringUniqueDao.deleteAll();
mySystemDao.markAllResourcesForReindexing();
mySystemDao.performReindexingPass(1000);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
myResourceReindexingSvc.forceReindexingPass();
uniques = myResourceIndexedCompositeStringUniqueDao.findAll();
assertEquals(uniques.toString(), 1, uniques.size());

View File

@ -10,7 +10,7 @@ import org.junit.AfterClass;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Matchers.eq;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
public class FhirResourceDaoR4UpdateTagSnapshotTest extends BaseJpaR4Test {

View File

@ -1,20 +1,5 @@
package ca.uhn.fhir.jpa.dao.r4;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import java.util.*;
import net.ttddyy.dsproxy.QueryCountHolder;
import org.hl7.fhir.r4.model.*;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.*;
import org.mockito.ArgumentCaptor;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.model.primitive.InstantDt;
@ -22,11 +7,30 @@ import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.StringParam;
import ca.uhn.fhir.rest.server.exceptions.*;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.test.context.TestPropertySource;
import java.util.*;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
@TestPropertySource(properties = {
"scheduling_disabled=true"
})

View File

@ -51,11 +51,6 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirSystemDaoR4Test.class);
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
@After
public void after() {
myDaoConfig.setAllowInlineMatchUrlReferences(false);
@ -175,7 +170,7 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
fail();
return null;
}
@Test
public void testTransactionReSavesPreviouslyDeletedResources() {
@ -238,7 +233,6 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
myPatientDao.read(new IdType("Patient/pt"));
}
@Test
public void testResourceCounts() {
@ -534,69 +528,43 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
vs.setUrl("http://foo");
myValueSetDao.create(vs, mySrd);
ResourceTable entity = new TransactionTemplate(myTxManager).execute(new TransactionCallback<ResourceTable>() {
@Override
public ResourceTable doInTransaction(TransactionStatus theStatus) {
return myEntityManager.find(ResourceTable.class, id.getIdPartAsLong());
}
});
ResourceTable entity = new TransactionTemplate(myTxManager).execute(t -> myEntityManager.find(ResourceTable.class, id.getIdPartAsLong()));
assertEquals(Long.valueOf(1), entity.getIndexStatus());
mySystemDao.markAllResourcesForReindexing();
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
entity = new TransactionTemplate(myTxManager).execute(new TransactionCallback<ResourceTable>() {
@Override
public ResourceTable doInTransaction(TransactionStatus theStatus) {
return myEntityManager.find(ResourceTable.class, id.getIdPartAsLong());
}
});
assertEquals(null, entity.getIndexStatus());
mySystemDao.performReindexingPass(null);
entity = new TransactionTemplate(myTxManager).execute(new TransactionCallback<ResourceTable>() {
@Override
public ResourceTable doInTransaction(TransactionStatus theStatus) {
return myEntityManager.find(ResourceTable.class, id.getIdPartAsLong());
}
});
entity = new TransactionTemplate(myTxManager).execute(t -> myEntityManager.find(ResourceTable.class, id.getIdPartAsLong()));
assertEquals(Long.valueOf(1), entity.getIndexStatus());
// Just make sure this doesn't cause a choke
mySystemDao.performReindexingPass(100000);
myResourceReindexingSvc.forceReindexingPass();
// Try making the resource unparseable
TransactionTemplate template = new TransactionTemplate(myTxManager);
template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
template.execute(new TransactionCallback<ResourceTable>() {
@Override
public ResourceTable doInTransaction(TransactionStatus theStatus) {
ResourceHistoryTable resourceHistoryTable = myResourceHistoryTableDao.findForIdAndVersion(id.getIdPartAsLong(), id.getVersionIdPartAsLong());
resourceHistoryTable.setEncoding(ResourceEncodingEnum.JSON);
try {
resourceHistoryTable.setResource("{\"resourceType\":\"FOO\"}".getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new Error(e);
}
myResourceHistoryTableDao.save(resourceHistoryTable);
ResourceTable table = myResourceTableDao.findById(id.getIdPartAsLong()).orElseThrow(IllegalStateException::new);
table.setIndexStatus(null);
myResourceTableDao.save(table);
return null;
template.execute((TransactionCallback<ResourceTable>) t -> {
ResourceHistoryTable resourceHistoryTable = myResourceHistoryTableDao.findForIdAndVersion(id.getIdPartAsLong(), id.getVersionIdPartAsLong());
resourceHistoryTable.setEncoding(ResourceEncodingEnum.JSON);
try {
resourceHistoryTable.setResource("{\"resourceType\":\"FOO\"}".getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new Error(e);
}
myResourceHistoryTableDao.save(resourceHistoryTable);
ResourceTable table = myResourceTableDao.findById(id.getIdPartAsLong()).orElseThrow(IllegalStateException::new);
table.setIndexStatus(null);
myResourceTableDao.save(table);
return null;
});
mySystemDao.performReindexingPass(null);
myResourceReindexingSvc.markAllResourcesForReindexing();
myResourceReindexingSvc.forceReindexingPass();
entity = new TransactionTemplate(myTxManager).execute(new TransactionCallback<ResourceTable>() {
@Override
public ResourceTable doInTransaction(TransactionStatus theStatus) {
return myEntityManager.find(ResourceTable.class, id.getIdPartAsLong());
}
});
entity = new TransactionTemplate(myTxManager).execute(theStatus -> myEntityManager.find(ResourceTable.class, id.getIdPartAsLong()));
assertEquals(Long.valueOf(2), entity.getIndexStatus());
}
@ -3119,6 +3087,44 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
assertEquals(1, found.size().intValue());
}
@Test
public void testTransactionWithRelativeOidIds() {
Bundle res = new Bundle();
res.setType(BundleType.TRANSACTION);
Patient p1 = new Patient();
p1.setId("urn:oid:0.1.2.3");
p1.addIdentifier().setSystem("system").setValue("testTransactionWithRelativeOidIds01");
res.addEntry().setResource(p1).getRequest().setMethod(HTTPVerb.POST).setUrl("Patient");
Observation o1 = new Observation();
o1.addIdentifier().setSystem("system").setValue("testTransactionWithRelativeOidIds02");
o1.setSubject(new Reference("urn:oid:0.1.2.3"));
res.addEntry().setResource(o1).getRequest().setMethod(HTTPVerb.POST).setUrl("Observation");
Observation o2 = new Observation();
o2.addIdentifier().setSystem("system").setValue("testTransactionWithRelativeOidIds03");
o2.setSubject(new Reference("urn:oid:0.1.2.3"));
res.addEntry().setResource(o2).getRequest().setMethod(HTTPVerb.POST).setUrl("Observation");
Bundle resp = mySystemDao.transaction(mySrd, res);
ourLog.info(myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(resp));
assertEquals(BundleType.TRANSACTIONRESPONSE, resp.getTypeElement().getValue());
assertEquals(3, resp.getEntry().size());
assertTrue(resp.getEntry().get(0).getResponse().getLocation(), new IdType(resp.getEntry().get(0).getResponse().getLocation()).getIdPart().matches("^[0-9]+$"));
assertTrue(resp.getEntry().get(1).getResponse().getLocation(), new IdType(resp.getEntry().get(1).getResponse().getLocation()).getIdPart().matches("^[0-9]+$"));
assertTrue(resp.getEntry().get(2).getResponse().getLocation(), new IdType(resp.getEntry().get(2).getResponse().getLocation()).getIdPart().matches("^[0-9]+$"));
o1 = myObservationDao.read(new IdType(resp.getEntry().get(1).getResponse().getLocation()), mySrd);
o2 = myObservationDao.read(new IdType(resp.getEntry().get(2).getResponse().getLocation()), mySrd);
assertThat(o1.getSubject().getReferenceElement().getValue(), endsWith("Patient/" + p1.getIdElement().getIdPart()));
assertThat(o2.getSubject().getReferenceElement().getValue(), endsWith("Patient/" + p1.getIdElement().getIdPart()));
}
//
//
// /**
@ -3221,44 +3227,6 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
//
// }
@Test
public void testTransactionWithRelativeOidIds() {
Bundle res = new Bundle();
res.setType(BundleType.TRANSACTION);
Patient p1 = new Patient();
p1.setId("urn:oid:0.1.2.3");
p1.addIdentifier().setSystem("system").setValue("testTransactionWithRelativeOidIds01");
res.addEntry().setResource(p1).getRequest().setMethod(HTTPVerb.POST).setUrl("Patient");
Observation o1 = new Observation();
o1.addIdentifier().setSystem("system").setValue("testTransactionWithRelativeOidIds02");
o1.setSubject(new Reference("urn:oid:0.1.2.3"));
res.addEntry().setResource(o1).getRequest().setMethod(HTTPVerb.POST).setUrl("Observation");
Observation o2 = new Observation();
o2.addIdentifier().setSystem("system").setValue("testTransactionWithRelativeOidIds03");
o2.setSubject(new Reference("urn:oid:0.1.2.3"));
res.addEntry().setResource(o2).getRequest().setMethod(HTTPVerb.POST).setUrl("Observation");
Bundle resp = mySystemDao.transaction(mySrd, res);
ourLog.info(myFhirCtx.newXmlParser().setPrettyPrint(true).encodeResourceToString(resp));
assertEquals(BundleType.TRANSACTIONRESPONSE, resp.getTypeElement().getValue());
assertEquals(3, resp.getEntry().size());
assertTrue(resp.getEntry().get(0).getResponse().getLocation(), new IdType(resp.getEntry().get(0).getResponse().getLocation()).getIdPart().matches("^[0-9]+$"));
assertTrue(resp.getEntry().get(1).getResponse().getLocation(), new IdType(resp.getEntry().get(1).getResponse().getLocation()).getIdPart().matches("^[0-9]+$"));
assertTrue(resp.getEntry().get(2).getResponse().getLocation(), new IdType(resp.getEntry().get(2).getResponse().getLocation()).getIdPart().matches("^[0-9]+$"));
o1 = myObservationDao.read(new IdType(resp.getEntry().get(1).getResponse().getLocation()), mySrd);
o2 = myObservationDao.read(new IdType(resp.getEntry().get(2).getResponse().getLocation()), mySrd);
assertThat(o1.getSubject().getReferenceElement().getValue(), endsWith("Patient/" + p1.getIdElement().getIdPart()));
assertThat(o2.getSubject().getReferenceElement().getValue(), endsWith("Patient/" + p1.getIdElement().getIdPart()));
}
/**
* This is not the correct way to do it, but we'll allow it to be lenient
*/
@ -3471,4 +3439,9 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
}
@AfterClass
public static void afterClassClearContext() {
TestUtil.clearAllStaticFieldsForUnitTest();
}
}

View File

@ -7,6 +7,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.util.*;
import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@ -236,11 +237,11 @@ public class ResourceProviderCustomSearchParamDstu3Test extends BaseResourceProv
fooSp.setStatus(org.hl7.fhir.dstu3.model.Enumerations.PublicationStatus.ACTIVE);
mySearchParameterDao.create(fooSp, mySrd);
res = myResourceTableDao.findById(patId.getIdPartAsLong()).orElseThrow(IllegalStateException::new);
assertEquals(null, res.getIndexStatus());
res = myResourceTableDao.findById(obsId.getIdPartAsLong()).orElseThrow(IllegalStateException::new);
assertEquals(BaseHapiFhirDao.INDEX_STATUS_INDEXED, res.getIndexStatus().longValue());
runInTransaction(()->{
List<ResourceReindexJobEntity> allJobs = myResourceReindexJobDao.findAll();
assertEquals(1, allJobs.size());
assertEquals("Patient", allJobs.get(0).getResourceType());
});
}
@Test

View File

@ -7,6 +7,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.util.*;
import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@ -236,10 +237,11 @@ public class ResourceProviderCustomSearchParamR4Test extends BaseResourceProvide
fooSp.setStatus(org.hl7.fhir.r4.model.Enumerations.PublicationStatus.ACTIVE);
mySearchParameterDao.create(fooSp, mySrd);
res = myResourceTableDao.findById(patId.getIdPartAsLong()).orElseThrow(IllegalStateException::new);
assertEquals(null, res.getIndexStatus());
res = myResourceTableDao.findById(obsId.getIdPartAsLong()).orElseThrow(IllegalStateException::new);
assertEquals(BaseHapiFhirDao.INDEX_STATUS_INDEXED, res.getIndexStatus().longValue());
runInTransaction(()->{
List<ResourceReindexJobEntity> allJobs = myResourceReindexJobDao.findAll();
assertEquals(1, allJobs.size());
assertEquals("Patient", allJobs.get(0).getResourceType());
});
}

View File

@ -193,7 +193,7 @@ public class SearchCoordinatorSvcImplTest {
});
when(mySearchDao.findByUuid(any())).thenAnswer(t -> myCurrentSearch);
IFhirResourceDao dao = myCallingDao;
when(myDaoRegistry.getResourceDao(any())).thenReturn(dao);
when(myDaoRegistry.getResourceDao(any(String.class))).thenReturn(dao);
resources = result.getResources(0, 100000);
assertEquals(790, resources.size());

View File

@ -0,0 +1,262 @@
package ca.uhn.fhir.jpa.search.reindex;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.dao.BaseJpaTest;
import ca.uhn.fhir.jpa.dao.DaoConfig;
import ca.uhn.fhir.jpa.dao.DaoRegistry;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.data.IForcedIdDao;
import ca.uhn.fhir.jpa.dao.data.IResourceReindexJobDao;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.entity.ResourceReindexJobEntity;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.SliceImpl;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.*;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
public class ResourceReindexingSvcImplTest extends BaseJpaTest {
private static FhirContext ourCtx = FhirContext.forR4();
@Mock
private PlatformTransactionManager myTxManager;
private ResourceReindexingSvcImpl mySvc;
private DaoConfig myDaoConfig;
@Mock
private DaoRegistry myDaoRegistry;
@Mock
private IForcedIdDao myForcedIdDao;
@Mock
private IResourceReindexJobDao myReindexJobDao;
@Mock
private IResourceTableDao myResourceTableDao;
@Mock
private IFhirResourceDao myResourceDao;
@Captor
private ArgumentCaptor<Long> myIdCaptor;
@Captor
private ArgumentCaptor<PageRequest> myPageRequestCaptor;
@Captor
private ArgumentCaptor<String> myTypeCaptor;
@Captor
private ArgumentCaptor<Date> myLowCaptor;
@Captor
private ArgumentCaptor<Date> myHighCaptor;
private ResourceReindexJobEntity mySingleJob;
@Override
protected FhirContext getContext() {
return ourCtx;
}
@Override
protected PlatformTransactionManager getTxManager() {
return myTxManager;
}
@Before
public void before() {
myDaoConfig = new DaoConfig();
myDaoConfig.setReindexThreadCount(2);
mySvc = new ResourceReindexingSvcImpl();
mySvc.setContextForUnitTest(ourCtx);
mySvc.setDaoConfigForUnitTest(myDaoConfig);
mySvc.setDaoRegistryForUnitTest(myDaoRegistry);
mySvc.setForcedIdDaoForUnitTest(myForcedIdDao);
mySvc.setReindexJobDaoForUnitTest(myReindexJobDao);
mySvc.setResourceTableDaoForUnitTest(myResourceTableDao);
mySvc.setTxManagerForUnitTest(myTxManager);
mySvc.start();
}
@Test
public void testMarkJobsPastThresholdAsDeleted() {
mockNothingToExpunge();
mockSingleReindexingJob(null);
mockFourResourcesNeedReindexing();
mockFetchFourResources();
mySingleJob.setThresholdHigh(DateUtils.addMinutes(new Date(), -1));
mySvc.forceReindexingPass();
verify(myResourceTableDao, never()).findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(any(), any(), any());
verify(myResourceTableDao, never()).findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(any(), any(), any(), any());
verify(myReindexJobDao, times(1)).markAsDeletedById(myIdCaptor.capture());
assertEquals(123L, myIdCaptor.getValue().longValue());
}
@Test
public void testExpungeDeletedJobs() {
ResourceReindexJobEntity job = new ResourceReindexJobEntity();
job.setIdForUnitTest(123L);
job.setDeleted(true);
when(myReindexJobDao.findAll(any(), eq(true))).thenReturn(Arrays.asList(job));
mySvc.forceReindexingPass();
verify(myReindexJobDao, times(1)).deleteById(eq(123L));
}
@Test
public void testReindexPassAllResources() {
mockNothingToExpunge();
mockSingleReindexingJob(null);
mockFourResourcesNeedReindexing();
mockFetchFourResources();
int count = mySvc.forceReindexingPass();
assertEquals(4, count);
// Make sure we reindexed all 4 resources
verify(myResourceDao, times(4)).reindex(any(), any());
// Make sure we updated the low threshold
verify(myReindexJobDao, times(1)).setThresholdLow(myIdCaptor.capture(), myLowCaptor.capture());
assertEquals(123L, myIdCaptor.getValue().longValue());
assertEquals(40 * DateUtils.MILLIS_PER_DAY, myLowCaptor.getValue().getTime());
// Make sure we didn't do anything unexpected
verify(myReindexJobDao, times(1)).findAll(any(), eq(false));
verify(myReindexJobDao, times(1)).findAll(any(), eq(true));
verifyNoMoreInteractions(myReindexJobDao);
}
@Test
public void testReindexPassPatients() {
mockNothingToExpunge();
mockSingleReindexingJob("Patient");
// Mock resource fetch
List<Long> values = Arrays.asList(0L, 1L, 2L, 3L);
when(myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(myPageRequestCaptor.capture(), myTypeCaptor.capture(), myLowCaptor.capture(), myHighCaptor.capture())).thenReturn(new SliceImpl<>(values));
// Mock fetching resources
long[] updatedTimes = new long[]{
10 * DateUtils.MILLIS_PER_DAY,
20 * DateUtils.MILLIS_PER_DAY,
40 * DateUtils.MILLIS_PER_DAY,
30 * DateUtils.MILLIS_PER_DAY,
};
String[] resourceTypes = new String[]{
"Patient",
"Patient",
"Patient",
"Patient"
};
List<IBaseResource> resources = Arrays.asList(
new Patient().setId("Patient/0"),
new Patient().setId("Patient/1"),
new Patient().setId("Patient/2"),
new Patient().setId("Patient/3")
);
mockWhenResourceTableFindById(updatedTimes, resourceTypes);
when(myDaoRegistry.getResourceDao(eq("Patient"))).thenReturn(myResourceDao);
when(myDaoRegistry.getResourceDao(eq(Patient.class))).thenReturn(myResourceDao);
when(myDaoRegistry.getResourceDao(eq("Observation"))).thenReturn(myResourceDao);
when(myDaoRegistry.getResourceDao(eq(Observation.class))).thenReturn(myResourceDao);
when(myResourceDao.toResource(any(), anyBoolean())).thenAnswer(t -> {
ResourceTable table = (ResourceTable) t.getArguments()[0];
Long id = table.getId();
return resources.get(id.intValue());
});
int count = mySvc.forceReindexingPass();
assertEquals(4, count);
// Make sure we reindexed all 4 resources
verify(myResourceDao, times(4)).reindex(any(), any());
// Make sure we updated the low threshold
verify(myReindexJobDao, times(1)).setThresholdLow(myIdCaptor.capture(), myLowCaptor.capture());
assertEquals(123L, myIdCaptor.getValue().longValue());
assertEquals(40 * DateUtils.MILLIS_PER_DAY, myLowCaptor.getValue().getTime());
// Make sure we didn't do anything unexpected
verify(myReindexJobDao, times(1)).findAll(any(), eq(false));
verify(myReindexJobDao, times(1)).findAll(any(), eq(true));
verifyNoMoreInteractions(myReindexJobDao);
}
private void mockWhenResourceTableFindById(long[] theUpdatedTimes, String[] theResourceTypes) {
when(myResourceTableDao.findById(any())).thenAnswer(t -> {
ResourceTable retVal = new ResourceTable();
Long id = (Long) t.getArguments()[0];
retVal.setId(id);
retVal.setResourceType(theResourceTypes[id.intValue()]);
retVal.setUpdated(new Date(theUpdatedTimes[id.intValue()]));
return Optional.of(retVal);
});
}
private void mockFetchFourResources() {
// Mock fetching resources
long[] updatedTimes = new long[]{
10 * DateUtils.MILLIS_PER_DAY,
20 * DateUtils.MILLIS_PER_DAY,
40 * DateUtils.MILLIS_PER_DAY,
30 * DateUtils.MILLIS_PER_DAY,
};
String[] resourceTypes = new String[]{
"Patient",
"Patient",
"Observation",
"Observation"
};
List<IBaseResource> resources = Arrays.asList(
new Patient().setId("Patient/0"),
new Patient().setId("Patient/1"),
new Observation().setId("Observation/2"),
new Observation().setId("Observation/3")
);
mockWhenResourceTableFindById(updatedTimes, resourceTypes);
when(myDaoRegistry.getResourceDao(eq("Patient"))).thenReturn(myResourceDao);
when(myDaoRegistry.getResourceDao(eq(Patient.class))).thenReturn(myResourceDao);
when(myDaoRegistry.getResourceDao(eq("Observation"))).thenReturn(myResourceDao);
when(myDaoRegistry.getResourceDao(eq(Observation.class))).thenReturn(myResourceDao);
when(myResourceDao.toResource(any(), anyBoolean())).thenAnswer(t -> {
ResourceTable table = (ResourceTable) t.getArguments()[0];
Long id = table.getId();
return resources.get(id.intValue());
});
}
private void mockFourResourcesNeedReindexing() {
// Mock resource fetch
List<Long> values = Arrays.asList(0L, 1L, 2L, 3L);
when(myResourceTableDao.findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(myPageRequestCaptor.capture(), myLowCaptor.capture(), myHighCaptor.capture())).thenReturn(new SliceImpl<>(values));
}
private void mockSingleReindexingJob(String theResourceType) {
// Mock the reindexing job
mySingleJob = new ResourceReindexJobEntity();
mySingleJob.setIdForUnitTest(123L);
mySingleJob.setThresholdHigh(DateUtils.addMinutes(new Date(), 1));
mySingleJob.setResourceType(theResourceType);
when(myReindexJobDao.findAll(any(), eq(false))).thenReturn(Arrays.asList(mySingleJob));
}
private void mockNothingToExpunge() {
// Nothing to expunge
when(myReindexJobDao.findAll(any(), eq(true))).thenReturn(new ArrayList<>());
}
}

View File

@ -550,14 +550,6 @@ public class TerminologySvcImplDstu3Test extends BaseJpaDstu3Test {
assertEquals("D1V", concept.getDesignation().get(0).getValue());
}
@Test
public void testReindexTerminology() {
IIdType id = createCodeSystem();
assertThat(mySystemDao.markAllResourcesForReindexing(), greaterThan(0));
assertThat(mySystemDao.performReindexingPass(100), greaterThan(0));
}
@Test
public void testStoreCodeSystemInvalidCyclicLoop() {

View File

@ -77,7 +77,7 @@ public enum DriverTypeEnum {
BasicDataSource dataSource = new BasicDataSource(){
@Override
public Connection getConnection() throws SQLException {
ourLog.info("Creating new DB connection");
ourLog.debug("Creating new DB connection");
return super.getConnection();
}
};

View File

@ -92,4 +92,5 @@ public class Migrator {
ourLog.info("Finished migration of {} tasks", myTasks.size());
}
}

View File

@ -9,9 +9,9 @@ package ca.uhn.fhir.jpa.migrate.tasks;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -30,13 +30,26 @@ import ca.uhn.fhir.jpa.migrate.taskdef.CalculateHashesTask;
import ca.uhn.fhir.jpa.migrate.tasks.api.BaseMigrationTasks;
import ca.uhn.fhir.util.VersionEnum;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@SuppressWarnings({"UnstableApiUsage", "SqlNoDataSourceInspection", "SpellCheckingInspection"})
public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
private final Set<FlagEnum> myFlags;
/**
* Constructor
*/
public HapiFhirJpaMigrationTasks() {
public HapiFhirJpaMigrationTasks(Set<String> theFlags) {
myFlags = theFlags
.stream()
.map(FlagEnum::fromCommandLineValue)
.collect(Collectors.toSet());
init340();
init350();
init360();
@ -60,6 +73,15 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.addColumn("OPTLOCK_VERSION")
.nullable()
.type(BaseTableColumnTypeTask.ColumnTypeEnum.INT);
version.addTable("HFJ_RES_REINDEX_JOB")
.addSql(DriverTypeEnum.MSSQL_2012, "create table HFJ_RES_REINDEX_JOB (PID bigint not null, JOB_DELETED bit not null, RES_TYPE varchar(255), SUSPENDED_UNTIL datetime2, UPDATE_THRESHOLD_HIGH datetime2 not null, UPDATE_THRESHOLD_LOW datetime2, primary key (PID))")
.addSql(DriverTypeEnum.DERBY_EMBEDDED, "create table HFJ_RES_REINDEX_JOB (PID bigint not null, JOB_DELETED boolean not null, RES_TYPE varchar(255), SUSPENDED_UNTIL timestamp, UPDATE_THRESHOLD_HIGH timestamp not null, UPDATE_THRESHOLD_LOW timestamp, primary key (PID))")
.addSql(DriverTypeEnum.MARIADB_10_1, "create table HFJ_RES_REINDEX_JOB (PID bigint not null, JOB_DELETED bit not null, RES_TYPE varchar(255), SUSPENDED_UNTIL datetime(6), UPDATE_THRESHOLD_HIGH datetime(6) not null, UPDATE_THRESHOLD_LOW datetime(6), primary key (PID))")
.addSql(DriverTypeEnum.POSTGRES_9_4, "persistence_create_postgres94.sql:create table HFJ_RES_REINDEX_JOB (PID int8 not null, JOB_DELETED boolean not null, RES_TYPE varchar(255), SUSPENDED_UNTIL timestamp, UPDATE_THRESHOLD_HIGH timestamp not null, UPDATE_THRESHOLD_LOW timestamp, primary key (PID))")
.addSql(DriverTypeEnum.MYSQL_5_7, " create table HFJ_RES_REINDEX_JOB (PID bigint not null, JOB_DELETED bit not null, RES_TYPE varchar(255), SUSPENDED_UNTIL datetime(6), UPDATE_THRESHOLD_HIGH datetime(6) not null, UPDATE_THRESHOLD_LOW datetime(6), primary key (PID))")
.addSql(DriverTypeEnum.ORACLE_12C, "create table HFJ_RES_REINDEX_JOB (PID number(19,0) not null, JOB_DELETED number(1,0) not null, RES_TYPE varchar2(255 char), SUSPENDED_UNTIL timestamp, UPDATE_THRESHOLD_HIGH timestamp not null, UPDATE_THRESHOLD_LOW timestamp, primary key (PID))");
}
private void init350() {
@ -80,65 +102,69 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
// Indexes - Coords
Builder.BuilderWithTableName spidxCoords = version.onTable("HFJ_SPIDX_COORDS");
version.startSectionWithMessage("Starting work on table: " + spidxCoords.getTableName());
spidxCoords
.dropIndex("IDX_SP_COORDS");
spidxCoords
.addColumn("HASH_IDENTITY")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxCoords
.addIndex("IDX_SP_COORDS_HASH")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_LATITUDE", "SP_LONGITUDE");
spidxCoords
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
);
if (!myFlags.contains(FlagEnum.NO_MIGRATE_HASHES)) {
spidxCoords
.dropIndex("IDX_SP_COORDS");
spidxCoords
.addIndex("IDX_SP_COORDS_HASH")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_LATITUDE", "SP_LONGITUDE");
spidxCoords
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
);
}
// Indexes - Date
Builder.BuilderWithTableName spidxDate = version.onTable("HFJ_SPIDX_DATE");
version.startSectionWithMessage("Starting work on table: " + spidxDate.getTableName());
spidxDate
.dropIndex("IDX_SP_TOKEN");
spidxDate
.addColumn("HASH_IDENTITY")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxDate
.addIndex("IDX_SP_DATE_HASH")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_VALUE_LOW", "SP_VALUE_HIGH");
spidxDate
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
);
if (!myFlags.contains(FlagEnum.NO_MIGRATE_HASHES)) {
spidxDate
.dropIndex("IDX_SP_TOKEN");
spidxDate
.addIndex("IDX_SP_DATE_HASH")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_VALUE_LOW", "SP_VALUE_HIGH");
spidxDate
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
);
}
// Indexes - Number
Builder.BuilderWithTableName spidxNumber = version.onTable("HFJ_SPIDX_NUMBER");
version.startSectionWithMessage("Starting work on table: " + spidxNumber.getTableName());
spidxNumber
.dropIndex("IDX_SP_NUMBER");
spidxNumber
.addColumn("HASH_IDENTITY")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxNumber
.addIndex("IDX_SP_NUMBER_HASH_VAL")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_VALUE");
spidxNumber
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
);
if (!myFlags.contains(FlagEnum.NO_MIGRATE_HASHES)) {
spidxNumber
.dropIndex("IDX_SP_NUMBER");
spidxNumber
.addIndex("IDX_SP_NUMBER_HASH_VAL")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_VALUE");
spidxNumber
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
);
}
// Indexes - Quantity
Builder.BuilderWithTableName spidxQuantity = version.onTable("HFJ_SPIDX_QUANTITY");
version.startSectionWithMessage("Starting work on table: " + spidxQuantity.getTableName());
spidxQuantity
.dropIndex("IDX_SP_QUANTITY");
spidxQuantity
.addColumn("HASH_IDENTITY")
.nullable()
@ -151,61 +177,63 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.addColumn("HASH_IDENTITY_AND_UNITS")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxQuantity
.addIndex("IDX_SP_QUANTITY_HASH")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_VALUE");
spidxQuantity
.addIndex("IDX_SP_QUANTITY_HASH_UN")
.unique(false)
.withColumns("HASH_IDENTITY_AND_UNITS", "SP_VALUE");
spidxQuantity
.addIndex("IDX_SP_QUANTITY_HASH_SYSUN")
.unique(false)
.withColumns("HASH_IDENTITY_SYS_UNITS", "SP_VALUE");
spidxQuantity
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
.addCalculator("HASH_IDENTITY_AND_UNITS", t -> ResourceIndexedSearchParamQuantity.calculateHashUnits(t.getResourceType(), t.getString("SP_NAME"), t.getString("SP_UNITS")))
.addCalculator("HASH_IDENTITY_SYS_UNITS", t -> ResourceIndexedSearchParamQuantity.calculateHashSystemAndUnits(t.getResourceType(), t.getString("SP_NAME"), t.getString("SP_SYSTEM"), t.getString("SP_UNITS")))
);
if (!myFlags.contains(FlagEnum.NO_MIGRATE_HASHES)) {
spidxQuantity
.dropIndex("IDX_SP_QUANTITY");
spidxQuantity
.addIndex("IDX_SP_QUANTITY_HASH")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_VALUE");
spidxQuantity
.addIndex("IDX_SP_QUANTITY_HASH_UN")
.unique(false)
.withColumns("HASH_IDENTITY_AND_UNITS", "SP_VALUE");
spidxQuantity
.addIndex("IDX_SP_QUANTITY_HASH_SYSUN")
.unique(false)
.withColumns("HASH_IDENTITY_SYS_UNITS", "SP_VALUE");
spidxQuantity
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
.addCalculator("HASH_IDENTITY_AND_UNITS", t -> ResourceIndexedSearchParamQuantity.calculateHashUnits(t.getResourceType(), t.getString("SP_NAME"), t.getString("SP_UNITS")))
.addCalculator("HASH_IDENTITY_SYS_UNITS", t -> ResourceIndexedSearchParamQuantity.calculateHashSystemAndUnits(t.getResourceType(), t.getString("SP_NAME"), t.getString("SP_SYSTEM"), t.getString("SP_UNITS")))
);
}
// Indexes - String
Builder.BuilderWithTableName spidxString = version.onTable("HFJ_SPIDX_STRING");
version.startSectionWithMessage("Starting work on table: " + spidxString.getTableName());
spidxString
.dropIndex("IDX_SP_STRING");
spidxString
.addColumn("HASH_NORM_PREFIX")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxString
.addIndex("IDX_SP_STRING_HASH_NRM")
.unique(false)
.withColumns("HASH_NORM_PREFIX", "SP_VALUE_NORMALIZED");
spidxString
.addColumn("HASH_EXACT")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxString
.addIndex("IDX_SP_STRING_HASH_EXCT")
.unique(false)
.withColumns("HASH_EXACT");
spidxString
.addTask(new CalculateHashesTask()
.setColumnName("HASH_NORM_PREFIX")
.addCalculator("HASH_NORM_PREFIX", t -> ResourceIndexedSearchParamString.calculateHashNormalized(new DaoConfig(), t.getResourceType(), t.getString("SP_NAME"), t.getString("SP_VALUE_NORMALIZED")))
.addCalculator("HASH_EXACT", t -> ResourceIndexedSearchParamString.calculateHashExact(t.getResourceType(), t.getParamName(), t.getString("SP_VALUE_EXACT")))
);
if (!myFlags.contains(FlagEnum.NO_MIGRATE_HASHES)) {
spidxString
.dropIndex("IDX_SP_STRING");
spidxString
.addIndex("IDX_SP_STRING_HASH_NRM")
.unique(false)
.withColumns("HASH_NORM_PREFIX", "SP_VALUE_NORMALIZED");
spidxString
.addColumn("HASH_EXACT")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxString
.addIndex("IDX_SP_STRING_HASH_EXCT")
.unique(false)
.withColumns("HASH_EXACT");
spidxString
.addTask(new CalculateHashesTask()
.setColumnName("HASH_NORM_PREFIX")
.addCalculator("HASH_NORM_PREFIX", t -> ResourceIndexedSearchParamString.calculateHashNormalized(new DaoConfig(), t.getResourceType(), t.getString("SP_NAME"), t.getString("SP_VALUE_NORMALIZED")))
.addCalculator("HASH_EXACT", t -> ResourceIndexedSearchParamString.calculateHashExact(t.getResourceType(), t.getParamName(), t.getString("SP_VALUE_EXACT")))
);
}
// Indexes - Token
Builder.BuilderWithTableName spidxToken = version.onTable("HFJ_SPIDX_TOKEN");
version.startSectionWithMessage("Starting work on table: " + spidxToken.getTableName());
spidxToken
.dropIndex("IDX_SP_TOKEN");
spidxToken
.dropIndex("IDX_SP_TOKEN_UNQUAL");
spidxToken
.addColumn("HASH_IDENTITY")
.nullable()
@ -222,30 +250,36 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.addColumn("HASH_VALUE")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxToken
.addIndex("IDX_SP_TOKEN_HASH")
.unique(false)
.withColumns("HASH_IDENTITY");
spidxToken
.addIndex("IDX_SP_TOKEN_HASH_S")
.unique(false)
.withColumns("HASH_SYS");
spidxToken
.addIndex("IDX_SP_TOKEN_HASH_SV")
.unique(false)
.withColumns("HASH_SYS_AND_VALUE");
spidxToken
.addIndex("IDX_SP_TOKEN_HASH_V")
.unique(false)
.withColumns("HASH_VALUE");
spidxToken
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
.addCalculator("HASH_SYS", t -> ResourceIndexedSearchParamToken.calculateHashSystem(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM")))
.addCalculator("HASH_SYS_AND_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashSystemAndValue(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM"), t.getString("SP_VALUE")))
.addCalculator("HASH_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashValue(t.getResourceType(), t.getParamName(), t.getString("SP_VALUE")))
);
if (!myFlags.contains(FlagEnum.NO_MIGRATE_HASHES)) {
spidxToken
.dropIndex("IDX_SP_TOKEN");
spidxToken
.dropIndex("IDX_SP_TOKEN_UNQUAL");
spidxToken
.addIndex("IDX_SP_TOKEN_HASH")
.unique(false)
.withColumns("HASH_IDENTITY");
spidxToken
.addIndex("IDX_SP_TOKEN_HASH_S")
.unique(false)
.withColumns("HASH_SYS");
spidxToken
.addIndex("IDX_SP_TOKEN_HASH_SV")
.unique(false)
.withColumns("HASH_SYS_AND_VALUE");
spidxToken
.addIndex("IDX_SP_TOKEN_HASH_V")
.unique(false)
.withColumns("HASH_VALUE");
spidxToken
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
.addCalculator("HASH_SYS", t -> ResourceIndexedSearchParamToken.calculateHashSystem(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM")))
.addCalculator("HASH_SYS_AND_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashSystemAndValue(t.getResourceType(), t.getParamName(), t.getString("SP_SYSTEM"), t.getString("SP_VALUE")))
.addCalculator("HASH_VALUE", t -> ResourceIndexedSearchParamToken.calculateHashValue(t.getResourceType(), t.getParamName(), t.getString("SP_VALUE")))
);
}
// Indexes - URI
Builder.BuilderWithTableName spidxUri = version.onTable("HFJ_SPIDX_URI");
@ -254,24 +288,26 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
.addColumn("HASH_IDENTITY")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxUri
.addIndex("IDX_SP_URI_HASH_IDENTITY")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_URI");
spidxUri
.addColumn("HASH_URI")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxUri
.addIndex("IDX_SP_URI_HASH_URI")
.unique(false)
.withColumns("HASH_URI");
spidxUri
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
.addCalculator("HASH_URI", t -> ResourceIndexedSearchParamUri.calculateHashUri(t.getResourceType(), t.getString("SP_NAME"), t.getString("SP_URI")))
);
if (!myFlags.contains(FlagEnum.NO_MIGRATE_HASHES)) {
spidxUri
.addIndex("IDX_SP_URI_HASH_IDENTITY")
.unique(false)
.withColumns("HASH_IDENTITY", "SP_URI");
spidxUri
.addColumn("HASH_URI")
.nullable()
.type(AddColumnTask.ColumnTypeEnum.LONG);
spidxUri
.addIndex("IDX_SP_URI_HASH_URI")
.unique(false)
.withColumns("HASH_URI");
spidxUri
.addTask(new CalculateHashesTask()
.setColumnName("HASH_IDENTITY")
.addCalculator("HASH_IDENTITY", t -> BaseResourceIndexedSearchParam.calculateHashIdentity(t.getResourceType(), t.getString("SP_NAME")))
.addCalculator("HASH_URI", t -> ResourceIndexedSearchParamUri.calculateHashUri(t.getResourceType(), t.getString("SP_NAME"), t.getString("SP_URI")))
);
}
// Search Parameter Presence
Builder.BuilderWithTableName spp = version.onTable("HFJ_RES_PARAM_PRESENT");
@ -492,5 +528,27 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
}
public enum FlagEnum {
NO_MIGRATE_HASHES("no-migrate-350-hashes");
private final String myCommandLineValue;
FlagEnum(String theCommandLineValue) {
myCommandLineValue = theCommandLineValue;
}
public String getCommandLineValue() {
return myCommandLineValue;
}
public static FlagEnum fromCommandLineValue(String theCommandLineValue) {
Optional<FlagEnum> retVal = Arrays.stream(values()).filter(t -> t.myCommandLineValue.equals(theCommandLineValue)).findFirst();
return retVal.orElseThrow(() -> {
List<String> validValues = Arrays.stream(values()).map(t -> t.myCommandLineValue).sorted().collect(Collectors.toList());
return new IllegalArgumentException("Invalid flag \"" + theCommandLineValue + "\". Valid values: " + validValues);
});
}
}
}

View File

@ -2,11 +2,13 @@ package ca.uhn.fhir.jpa.migrate.tasks;
import org.junit.Test;
import java.util.Collections;
public class HapiFhirJpaMigrationTasksTest {
@Test
public void testCreate() {
new HapiFhirJpaMigrationTasks();
new HapiFhirJpaMigrationTasks(Collections.emptySet());
}

View File

@ -2066,7 +2066,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>2.8.1</version>
<version>3.0.0</version>
<inherited>false</inherited>
<reportSets>
<reportSet>

View File

@ -142,42 +142,16 @@ Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)]]></pre>
<section name="Migrate Database">
<p>
When upgrading the JPA server from one version of HAPI FHIR to a newer version,
often there will be changes to the database schema. The <b>Migrate Database</b>
command can be used to perform a migration from one version to the next.
</p>
<p>
Note that this feature was added in HAPI FHIR 3.5.0. It is not able to migrate
from versions prior to HAPI FHIR 3.4.0. <b>Please make a backup of your
database before running this command!</b>
</p>
<p>
The following example shows how to use the migrator utility to migrate between two versions.
</p>
<pre>./hapi-fhir-cli migrate-database -d DERBY_EMBEDDED -u "jdbc:derby:directory:target/jpaserver_derby_files;create=true" -n "" -p "" -f V3_4_0 -t V3_5_0</pre>
<p>
You may use the following command to get detailed help on the options:
</p>
<pre>./hapi-fhir-cli help migrate-database</pre>
<p>
Note the arguments:
<ul>
<li><code>-d [dialect]</code> - This indicates the database dialect to use. See the detailed help for a list of options</li>
<li><code>-f [version]</code> - The version to migrate from</li>
<li><code>-t [version]</code> - The version to migrate to</li>
</ul>
The <code>migrate-database</code> command may be used to Migrate a database
schema when upgrading a
<a href="./doc_jpa.html">HAPI FHIR JPA</a> project from one version of HAPI
FHIR to another version.
</p>
<subsection name="Oracle Support">
<p>
Note that the Oracle JDBC drivers are not distributed in the Maven Central repository,
so they are not included in HAPI FHIR. In order to use this command with an Oracle database,
you will need to invoke the CLI as follows:
</p>
<pre>java -cp hapi-fhir-cli.jar ca.uhn.fhir.cli.App migrate-database -d ORACLE_12C -u "[url]" -n "[username]" -p "[password]" -f V3_4_0 -t V3_5_0</pre>
</subsection>
<p>
See <a href="./doc_jpa.html#upgrading">Upgrading HAPI FHIR JPA</a>
for information on how to use this command.
</p>
</section>

View File

@ -366,7 +366,143 @@ delete from hfj_res_ver where res_id in (select res_id from hfj_resource where s
-->
<a name="upgrading"/>
<section name="Upgrading HAPI FHIR JPA">
<p>
HAPI FHIR JPA is a constantly evolving product, with new features being added to each
new version of the library. As a result, it is generally necessary to execute a database
migration as a part of an upgrade to HAPI FHIR.
</p>
<p>
When upgrading the JPA server from one version of HAPI FHIR to a newer version,
often there will be changes to the database schema. The <b>Migrate Database</b>
command can be used to perform a migration from one version to the next.
</p>
<p>
Note that this feature was added in HAPI FHIR 3.5.0. It is not able to migrate
from versions prior to HAPI FHIR 3.4.0. <b>Please make a backup of your
database before running this command!</b>
</p>
<p>
The following example shows how to use the migrator utility to migrate between two versions.
</p>
<pre>./hapi-fhir-cli migrate-database -d DERBY_EMBEDDED -u "jdbc:derby:directory:target/jpaserver_derby_files;create=true" -n "" -p "" -f V3_4_0 -t V3_5_0</pre>
<p>
You may use the following command to get detailed help on the options:
</p>
<pre>./hapi-fhir-cli help migrate-database</pre>
<p>
Note the arguments:
<ul>
<li><code>-d [dialect]</code> - This indicates the database dialect to use. See the detailed help for a list of options</li>
<li><code>-f [version]</code> - The version to migrate from</li>
<li><code>-t [version]</code> - The version to migrate to</li>
</ul>
</p>
<subsection name="Oracle Support">
<p>
Note that the Oracle JDBC drivers are not distributed in the Maven Central repository,
so they are not included in HAPI FHIR. In order to use this command with an Oracle database,
you will need to invoke the CLI as follows:
</p>
<pre>java -cp hapi-fhir-cli.jar ca.uhn.fhir.cli.App migrate-database -d ORACLE_12C -u "[url]" -n "[username]" -p "[password]" -f V3_4_0 -t V3_5_0</pre>
</subsection>
<subsection name="Migrating 3.4.0 to 3.5.0+">
<p>
As of HAPI FHIR 3.5.0 a new mechanism for creating the JPA index tables (HFJ_SPIDX_xxx)
has been implemented. This new mechanism uses hashes in place of large multi-column
indexes. This improves both lookup times as well as required storage space. This change
also paves the way for future ability to provide efficient multi-tenant searches (which
is not yet implemented but is planned as an incremental improvement).
</p>
<p>
This change is not a lightweight change however, as it requires a rebuild of the
index tables in order to generate the hashes. This can take a long time on databases
that already have a large amount of data.
</p>
<p>
As a result, in HAPI FHIR JPA 3.6.0, an efficient way of upgrading existing databases
was added. Under this new scheme, columns for the hashes are added but values are not
calculated initially, database indexes are not modified on the HFJ_SPIDX_xxx tables,
and the previous columns are still used for searching as was the case in HAPI FHIR
JPA 3.4.0.
</p>
<p>
In order to perform a migration using this functionality, the following steps should
be followed:
</p>
<ul>
<li>
Stop your running HAPI FHIR JPA instance (and remember to make a backup of your
database before proceeding with any changes!)
</li>
<li>
Modify your <code>DaoConfig</code> to specify that hash-based searches should not be used, using
the following setting:<br/>
<pre>myDaoConfig.setDisableHashBasedSearches(true);</pre>
</li>
<li>
Make sure that you have your JPA settings configured to not automatically
create database indexes and columns using the following setting
in your JPA Properties:<br/>
<pre>extraProperties.put("hibernate.hbm2ddl.auto", "none");</pre>
</li>
<li>
Run the database migrator command, including the entry <code>-x no-migrate-350-hashes</code>
on the command line. For example:<br/>
<pre>./hapi-fhir-cli migrate-database -d DERBY_EMBEDDED -u "jdbc:derby:directory:target/jpaserver_derby_files;create=true" -n "" -p "" -f V3_4_0 -t V3_6_0 -x no-migrate-350-hashes</pre>
</li>
<li>
Rebuild and start your HAPI FHIR JPA server. At this point you should have a working
HAPI FHIR JPA 3.6.0 server that is is still using HAPI FHIR 3.4.0 search indexes. Search hashes
will be generated for any newly created or updated data but existing data will have null
hashes.
</li>
<li>
With the system running, request a complete reindex of the data in the database using
an HTTP request such as the following:<br/>
<pre>GET /$mark-all-resources-for-reindexing</pre>
Note that this is a custom operation built into the HAPI FHIR JPA server. It should
be secured in a real deployment, so Authentication is likely required for this
call.
</li>
<li>
You can track the reindexing process by watching your server logs,
but also by using the following SQL executed directly against your database:
<br/>
<pre>SELECT * FROM HFJ_RES_REINDEX_JOB</pre>
When this query no longer returns any rows, the reindexing process is complete.
</li>
<li>
At this time, HAPI FHIR should be stopped once again in order to convert it
to using the hash based indexes.
</li>
<li>
Modify your <code>DaoConfig</code> to specify that hash-based searches are used, using
the following setting (this is the default setting, so it could also simply
be omitted):<br/>
<pre>myDaoConfig.setDisableHashBasedSearches(false);</pre>
</li>
<li>
Execute the migrator tool again, this time omitting the flag option, e.g.<br/>
<pre>./hapi-fhir-cli migrate-database -d DERBY_EMBEDDED -u "jdbc:derby:directory:target/jpaserver_derby_files;create=true" -n "" -p "" -f V3_4_0 -t V3_6_0</pre>
</li>
<li>
Rebuild, and start HAPI FHIR JPA again.
</li>
</ul>
</subsection>
</section>
</body>
</document>