FIx broken reindex process

This commit is contained in:
James Agnew 2018-05-23 10:48:07 -04:00
parent 2e74ad72cb
commit 0c5c347db7
10 changed files with 191 additions and 102 deletions

View File

@ -33,7 +33,6 @@ import ca.uhn.fhir.jpa.util.ReindexController;
import org.hibernate.jpa.HibernatePersistenceProvider;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
@ -51,7 +50,6 @@ import org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import javax.annotation.Nonnull;
import javax.annotation.Resource;
@Configuration
@EnableScheduling
@ -62,8 +60,6 @@ public abstract class BaseConfig implements SchedulingConfigurer {
@Autowired
protected Environment myEnv;
@Resource
private ApplicationContext myAppCtx;
@Override
public void configureTasks(@Nonnull ScheduledTaskRegistrar theTaskRegistrar) {
@ -94,12 +90,12 @@ public abstract class BaseConfig implements SchedulingConfigurer {
}
@Bean
public HibernateJpaDialect hibernateJpaDialectIntance() {
public HibernateJpaDialect hibernateJpaDialectInstance() {
return new HibernateJpaDialect();
}
@Bean
private IReindexController reindexController() {
public IReindexController reindexController() {
return new ReindexController();
}

View File

@ -32,7 +32,7 @@ import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
import ca.uhn.fhir.jpa.util.DeleteConflict;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
import ca.uhn.fhir.jpa.util.ReindexController;
import ca.uhn.fhir.jpa.util.IReindexController;
import ca.uhn.fhir.jpa.util.jsonpatch.JsonPatchUtils;
import ca.uhn.fhir.jpa.util.xmlpatch.XmlPatchUtils;
import ca.uhn.fhir.model.api.*;
@ -90,7 +90,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Autowired
private ISearchParamRegistry mySearchParamRegistry;
@Autowired
private ReindexController myReindexController;
private IReindexController myReindexController;
@Override
public void addTag(IIdType theId, TagTypeEnum theTagType, String theScheme, String theTerm, String theLabel) {

View File

@ -5,27 +5,35 @@ import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.dao.data.ITermConceptDao;
import ca.uhn.fhir.jpa.entity.ForcedId;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import ca.uhn.fhir.jpa.util.*;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.jpa.util.ExpungeOptions;
import ca.uhn.fhir.jpa.util.ExpungeOutcome;
import ca.uhn.fhir.jpa.util.ReindexFailureException;
import ca.uhn.fhir.jpa.util.ResourceCountCache;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.interceptor.IServerInterceptor.ActionRequestDetails;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hibernate.search.util.impl.Executors;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.PageRequest;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.persistence.Query;
import javax.persistence.TypedQuery;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -39,9 +47,9 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -53,12 +61,12 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBaseResource> implements IFhirSystemDao<T, MT> {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(BaseHapiFhirSystemDao.class);
@Autowired
@Qualifier("myResourceCountsCache")
public ResourceCountCache myResourceCountsCache;
@Autowired
private IForcedIdDao myForcedIdDao;
private ReentrantLock myReindexLock = new ReentrantLock(false);
@Autowired
private ITermConceptDao myTermConceptDao;
@Autowired
@ -67,10 +75,7 @@ public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBase
private PlatformTransactionManager myTxManager;
@Autowired
private IResourceTableDao myResourceTableDao;
@Autowired
@Qualifier("myResourceCountsCache")
public ResourceCountCache myResourceCountsCache;
private ThreadFactory myReindexingThreadFactory = new BasicThreadFactory.Builder().namingPattern("ResourceReindex-%d").build();
private int doPerformReindexingPass(final Integer theCount) {
/*
@ -87,70 +92,53 @@ public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBase
@SuppressWarnings("ConstantConditions")
private int doPerformReindexingPassForResources(final Integer theCount, TransactionTemplate txTemplate) {
return txTemplate.execute(new TransactionCallback<Integer>() {
@SuppressWarnings("unchecked")
@Override
public Integer doInTransaction(@Nonnull TransactionStatus theStatus) {
int maxResult = 500;
if (theCount != null) {
maxResult = Math.min(theCount, 2000);
}
maxResult = Math.max(maxResult, 10);
TypedQuery<Long> q = myEntityManager.createQuery("SELECT t.myId FROM ResourceTable t WHERE t.myIndexStatus IS NULL", Long.class);
ourLog.debug("Beginning indexing query with maximum {}", maxResult);
q.setMaxResults(maxResult);
Collection<Long> resources = q.getResultList();
int count = 0;
long start = System.currentTimeMillis();
for (Long nextId : resources) {
ResourceTable resourceTable = myResourceTableDao.findOne(nextId);
try {
/*
* This part is because from HAPI 1.5 - 1.6 we changed the format of forced ID to be "type/id" instead of just "id"
*/
ForcedId forcedId = resourceTable.getForcedId();
if (forcedId != null) {
if (isBlank(forcedId.getResourceType())) {
ourLog.info("Updating resource {} forcedId type to {}", forcedId.getForcedId(), resourceTable.getResourceType());
forcedId.setResourceType(resourceTable.getResourceType());
myForcedIdDao.save(forcedId);
}
}
final IBaseResource resource = toResource(resourceTable, false);
@SuppressWarnings("rawtypes") final IFhirResourceDao dao = getDao(resource.getClass());
dao.reindex(resource, resourceTable);
} catch (Exception e) {
ourLog.error("Failed to index resource {}: {}", new Object[]{resourceTable.getIdDt(), e.toString(), e});
throw new ReindexFailureException(resourceTable.getId());
}
count++;
if (count >= maxResult) {
break;
}
}
long delay = System.currentTimeMillis() - start;
long avg;
if (count > 0) {
avg = (delay / count);
ourLog.info("Indexed {} resources in {}ms - Avg {}ms / resource", new Object[]{count, delay, avg});
} else {
ourLog.debug("Indexed 0 resources in {}ms", delay);
}
return count;
// Determine the IDs needing reindexing
List<Long> idsToReindex = txTemplate.execute(theStatus -> {
int maxResult = 500;
if (theCount != null) {
maxResult = Math.min(theCount, 2000);
}
maxResult = Math.max(maxResult, 10);
ourLog.debug("Beginning indexing query with maximum {}", maxResult);
return myResourceTableDao
.findIdsOfResourcesRequiringReindexing(new PageRequest(0, maxResult))
.getContent();
});
// If no IDs need reindexing, we're good here
if (idsToReindex.isEmpty()) {
return 0;
}
// Reindex
StopWatch sw = new StopWatch();
// Execute each reindex in a task within a threadpool
int threadCount = getConfig().getReindexThreadCount();
RejectedExecutionHandler rejectHandler = new Executors.BlockPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(threadCount, threadCount,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
myReindexingThreadFactory,
rejectHandler
);
List<Future<?>> futures = new ArrayList<>();
for (Long nextId : idsToReindex) {
futures.add(executor.submit(new ResourceReindexingTask(nextId)));
}
for (Future<?> next : futures) {
try {
next.get();
} catch (Exception e) {
throw new InternalErrorException("Failed to reindex: ", e);
}
}
executor.shutdown();
ourLog.info("Reindexed {} resources in {} threads - {}ms/resource", idsToReindex.size(), threadCount, sw.getMillisPerOperation(idsToReindex.size()));
return idsToReindex.size();
}
@Override
@ -164,7 +152,7 @@ public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBase
public Map<String, Long> getResourceCounts() {
Map<String, Long> retVal = new HashMap<>();
List<Map<?,?>> counts = myResourceTableDao.getResourceCounts();
List<Map<?, ?>> counts = myResourceTableDao.getResourceCounts();
for (Map<?, ?> next : counts) {
retVal.put(next.get("type").toString(), Long.parseLong(next.get("count").toString()));
}
@ -213,7 +201,7 @@ public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBase
txTemplate.execute(new TransactionCallback<Void>() {
@Override
public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
ourLog.info("Marking resource with PID {} as indexing_failed", new Object[]{theId});
ourLog.info("Marking resource with PID {} as indexing_failed", new Object[] {theId});
Query q = myEntityManager.createQuery("UPDATE ResourceTable t SET t.myIndexStatus = :status WHERE t.myId = :id");
q.setParameter("status", INDEX_STATUS_INDEXING_FAILED);
q.setParameter("id", theId);
@ -281,4 +269,58 @@ public abstract class BaseHapiFhirSystemDao<T, MT> extends BaseHapiFhirDao<IBase
}
}
private class ResourceReindexingTask implements Runnable {
private final Long myNextId;
public ResourceReindexingTask(Long theNextId) {
myNextId = theNextId;
}
@SuppressWarnings("unchecked")
@Override
public void run() {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.afterPropertiesSet();
Throwable reindexFailure = txTemplate.execute(new TransactionCallback<Throwable>() {
@Override
public Throwable doInTransaction(TransactionStatus theStatus) {
ResourceTable resourceTable = myResourceTableDao.findOne(myNextId);
try {
/*
* This part is because from HAPI 1.5 - 1.6 we changed the format of forced ID to be "type/id" instead of just "id"
*/
ForcedId forcedId = resourceTable.getForcedId();
if (forcedId != null) {
if (isBlank(forcedId.getResourceType())) {
ourLog.info("Updating resource {} forcedId type to {}", forcedId.getForcedId(), resourceTable.getResourceType());
forcedId.setResourceType(resourceTable.getResourceType());
myForcedIdDao.save(forcedId);
}
}
final IBaseResource resource = toResource(resourceTable, false);
@SuppressWarnings("rawtypes") final IFhirResourceDao dao = getDao(resource.getClass());
dao.reindex(resource, resourceTable);
return null;
} catch (Exception e) {
ourLog.error("Failed to index resource {}: {}", resourceTable.getIdDt(), e.toString(), e);
return e;
}
}
});
if (reindexFailure != null) {
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus theStatus) {
ourLog.info("Setting resource PID[{}] status to ERRORED", myNextId);
myResourceTableDao.updateStatusToErrored(myNextId);
}
});
}
}
}
}

View File

@ -18,9 +18,9 @@ import java.util.*;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -50,6 +50,12 @@ public class DaoConfig {
* Default value for {@link #setReuseCachedSearchResultsForMillis(Long)}: 60000ms (one minute)
*/
public static final Long DEFAULT_REUSE_CACHED_SEARCH_RESULTS_FOR_MILLIS = DateUtils.MILLIS_PER_MINUTE;
/**
* Default value for {@link #setTranslationCachesExpireAfterWriteInMinutes(Long)}: 60 minutes
*
* @see #setTranslationCachesExpireAfterWriteInMinutes(Long)
*/
public static final Long DEFAULT_TRANSLATION_CACHES_EXPIRE_AFTER_WRITE_IN_MINUTES = 60L;
/**
* Default value for {@link #setMaximumSearchResultCountInTransaction(Integer)}
*
@ -57,12 +63,6 @@ public class DaoConfig {
*/
private static final Integer DEFAULT_MAXIMUM_SEARCH_RESULT_COUNT_IN_TRANSACTION = null;
private IndexEnabledEnum myIndexMissingFieldsEnabled = IndexEnabledEnum.DISABLED;
/**
* Default value for {@link #setTranslationCachesExpireAfterWriteInMinutes(Long)}: 60 minutes
*
* @see #setTranslationCachesExpireAfterWriteInMinutes(Long)
*/
public static final Long DEFAULT_TRANSLATION_CACHES_EXPIRE_AFTER_WRITE_IN_MINUTES = 60L;
/**
* update setter javadoc if default changes
*/
@ -127,6 +127,7 @@ public class DaoConfig {
private IdStrategyEnum myResourceServerIdStrategy = IdStrategyEnum.SEQUENTIAL_NUMERIC;
private boolean myMarkResourcesForReindexingUponSearchParameterChange;
private boolean myExpungeEnabled;
private int myReindexThreadCount;
/**
* Constructor
@ -136,6 +137,7 @@ public class DaoConfig {
setSubscriptionPollDelay(0);
setSubscriptionPurgeInactiveAfterMillis(Long.MAX_VALUE);
setMarkResourcesForReindexingUponSearchParameterChange(true);
setReindexThreadCount(Runtime.getRuntime().availableProcessors());
}
/**
@ -152,7 +154,6 @@ public class DaoConfig {
myTreatReferencesAsLogical.add(theTreatReferencesAsLogical);
}
/**
* Specifies the highest number that a client is permitted to use in a
* <code>Cache-Control: nostore, max-results=NNN</code>
@ -470,6 +471,35 @@ public class DaoConfig {
myMaximumSearchResultCountInTransaction = theMaximumSearchResultCountInTransaction;
}
/**
* This setting controls the number of threads allocated to resource reindexing
* (which is only ever used if SearchParameters change, or a manual reindex is
* triggered due to a HAPI FHIR upgrade or some other reason).
* <p>
* The default value is set to the number of available processors
* (via <code>Runtime.getRuntime().availableProcessors()</code>). Value
* for this setting must be a positive integer.
* </p>
*/
public int getReindexThreadCount() {
return myReindexThreadCount;
}
/**
* This setting controls the number of threads allocated to resource reindexing
* (which is only ever used if SearchParameters change, or a manual reindex is
* triggered due to a HAPI FHIR upgrade or some other reason).
* <p>
* The default value is set to the number of available processors
* (via <code>Runtime.getRuntime().availableProcessors()</code>). Value
* for this setting must be a positive integer.
* </p>
*/
public void setReindexThreadCount(int theReindexThreadCount) {
myReindexThreadCount = theReindexThreadCount;
myReindexThreadCount = Math.max(myReindexThreadCount, 1); // Minimum of 1
}
public ResourceEncodingEnum getResourceEncoding() {
return myResourceEncoding;
}

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.dao.data;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.entity.ResourceTable;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
@ -20,9 +21,9 @@ import java.util.Map;
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -43,13 +44,16 @@ public interface IResourceTableDao extends JpaRepository<ResourceTable, Long> {
Slice<Long> findIdsOfDeletedResourcesOfType(Pageable thePageable, @Param("resid") Long theResourceId, @Param("restype") String theResourceName);
@Query("SELECT t.myId FROM ResourceTable t WHERE t.myIndexStatus IS NULL")
Slice<Long> findUnindexed(Pageable thePageRequest);
Slice<Long> findIdsOfResourcesRequiringReindexing(Pageable thePageable);
@Query("SELECT t.myResourceType as type, COUNT(*) as count FROM ResourceTable t GROUP BY t.myResourceType")
List<Map<?,?>> getResourceCounts();
List<Map<?, ?>> getResourceCounts();
@Modifying
@Query("UPDATE ResourceTable r SET r.myIndexStatus = null WHERE r.myResourceType = :restype")
int markResourcesOfTypeAsRequiringReindexing(@Param("restype") String theResourceType);
@Modifying
@Query("UPDATE ResourceTable r SET r.myIndexStatus = " + BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED + " WHERE r.myId = :resid")
void updateStatusToErrored(@Param("resid") Long theId);
}

View File

@ -134,9 +134,13 @@ public abstract class BaseResourceIndexedSearchParam implements Serializable {
Hasher hasher = HASH_FUNCTION.newHasher();
for (String next : theValues) {
next = UrlUtil.escapeUrlParam(next);
byte[] bytes = next.getBytes(Charsets.UTF_8);
hasher.putBytes(bytes);
if (next == null) {
hasher.putByte((byte) 0);
} else {
next = UrlUtil.escapeUrlParam(next);
byte[] bytes = next.getBytes(Charsets.UTF_8);
hasher.putBytes(bytes);
}
hasher.putBytes(DELIMITER_BYTES);
}

View File

@ -1,5 +1,14 @@
package ca.uhn.fhir.jpa.util;
public interface IReindexController {
/**
* This method is called automatically by the scheduler
*/
void performReindexingPass();
/**
* This method requests that the reindex process happen as soon as possible
*/
void requestReindex();
}

View File

@ -35,13 +35,14 @@ public class ReindexController implements IReindexController {
*/
@Scheduled(fixedDelay = DateUtils.MILLIS_PER_MINUTE)
@Transactional(propagation = Propagation.NEVER)
@Override
public void performReindexingPass() {
if (myDaoConfig.isSchedulingDisabled()) {
return;
}
synchronized (this) {
if (myDontReindexUntil == null && myDontReindexUntil > System.currentTimeMillis()) {
if (myDontReindexUntil != null && myDontReindexUntil > System.currentTimeMillis()) {
return;
}
}

View File

@ -477,7 +477,7 @@
<caffeine_version>2.6.2</caffeine_version>
<commons_codec_version>1.10</commons_codec_version>
<commons_io_version>2.5</commons_io_version>
<commons_lang3_version>3.6</commons_lang3_version>
<commons_lang3_version>3.7</commons_lang3_version>
<derby_version>10.14.2.0</derby_version>
<error_prone_annotations_version>2.0.18</error_prone_annotations_version>
<guava_version>23.0</guava_version>

View File

@ -12,6 +12,7 @@
latest versions (dependent HAPI modules listed in brackets):
<![CDATA[
<ul>
<li>Commons-Lang3 (All): 3.6 -&gt; 3.7</li>
<li>Hibernate (JPA): 5.2.12.Final -&gt; 5.2.16.Final</li>
<li>Javassist (JPA): 3.20.0-GA -&gt; 3.22.0-GA</li>
</ul>
@ -243,7 +244,9 @@
<action type="add">
The JPA server automatic reindexing process has been tweaked so that it no
longer runs once per minute (this was a heavy strain on large databases)
but will instead run once an hour unless triggered for some reason.
but will instead run once an hour unless triggered for some reason. In addition,
the number of threads allocated to reindexing may now be adjusted via a
setting in the DaoConfig.
</action>
</release>
<release version="3.3.0" date="2018-03-29">