Convert Reindex into Batch2 Job (#3458)

* Start working on reindex job

* Updates

* Ongoing work

* Ongoing work

* Work on reindex

* Reindex work

* Add logging

* Fix typo

* Test fix

* Test fix

* Address fixme

* Liocense header

* Resolve fixme

* Add logging

* Update logs

* Address review comments

* License header

* Test fixes

* Test fix

* Test fixes

* Test fix

* Version bump

* Version bump
This commit is contained in:
James Agnew 2022-03-19 16:07:58 -04:00 committed by GitHub
parent f518f30506
commit f82534ca51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
152 changed files with 2410 additions and 1158 deletions

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -42,14 +42,6 @@ public final class BatchConstants {
* Delete Expunge
*/
public static final String DELETE_EXPUNGE_JOB_NAME = "deleteExpungeJob";
/**
* Reindex
*/
public static final String REINDEX_JOB_NAME = "reindexJob";
/**
* Reindex Everything
*/
public static final String REINDEX_EVERYTHING_JOB_NAME = "reindexEverythingJob";
/**
* MDM Clear
*/

View File

@ -3,14 +3,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -27,8 +27,6 @@ import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig;
import ca.uhn.fhir.jpa.config.BatchJobRegisterer;
import ca.uhn.fhir.jpa.delete.job.DeleteExpungeJobConfig;
import ca.uhn.fhir.jpa.reindex.job.ReindexEverythingJobConfig;
import ca.uhn.fhir.jpa.reindex.job.ReindexJobConfig;
import ca.uhn.fhir.jpa.term.job.TermCodeSystemDeleteJobConfig;
import ca.uhn.fhir.jpa.term.job.TermCodeSystemVersionDeleteJobConfig;
import org.springframework.batch.core.configuration.JobRegistry;
@ -48,8 +46,6 @@ import org.springframework.context.annotation.Import;
BulkExportJobConfig.class,
BulkImportJobConfig.class,
DeleteExpungeJobConfig.class,
ReindexJobConfig.class,
ReindexEverythingJobConfig.class,
MdmClearJobConfig.class,
TermCodeSystemDeleteJobConfig.class,
TermCodeSystemVersionDeleteJobConfig.class

View File

@ -27,9 +27,7 @@ import ca.uhn.fhir.jpa.batch.processor.GoldenResourceAnnotatingProcessor;
import ca.uhn.fhir.jpa.batch.processor.PidToIBaseResourceProcessor;
import ca.uhn.fhir.jpa.batch.reader.ReverseCronologicalBatchResourcePidReader;
import ca.uhn.fhir.jpa.batch.writer.SqlExecutorWriter;
import ca.uhn.fhir.jpa.reindex.job.ReindexWriter;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -73,10 +71,4 @@ public class CommonBatchJobConfig {
return new GoldenResourceAnnotatingProcessor();
}
@Bean
@StepScope
public ReindexWriter reindexWriter() {
return new ReindexWriter();
}
}

View File

@ -174,6 +174,11 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(theChunkId, new Date(), theRecordsProcessed, StatusEnum.COMPLETED);
}
@Override
public void incrementWorkChunkErrorCount(String theChunkId, int theIncrementBy) {
myWorkChunkRepository.incrementWorkChunkErrorCount(theChunkId, theIncrementBy);
}
@Override
public List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int thePageSize, int thePageIndex) {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);

View File

@ -9,6 +9,7 @@ import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IDao;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
@ -84,7 +85,7 @@ import ca.uhn.fhir.jpa.provider.SubscriptionTriggeringProvider;
import ca.uhn.fhir.jpa.provider.TerminologyUploaderProvider;
import ca.uhn.fhir.jpa.provider.ValueSetOperationProvider;
import ca.uhn.fhir.jpa.provider.r4.MemberMatcherR4Helper;
import ca.uhn.fhir.jpa.reindex.ReindexJobSubmitterImpl;
import ca.uhn.fhir.jpa.reindex.ResourceReindexSvcImpl;
import ca.uhn.fhir.jpa.sched.AutowiringSpringBeanJobFactory;
import ca.uhn.fhir.jpa.sched.HapiSchedulerServiceImpl;
import ca.uhn.fhir.jpa.search.PersistedJpaBundleProvider;
@ -137,12 +138,10 @@ import ca.uhn.fhir.jpa.validation.ValidationSettings;
import ca.uhn.fhir.mdm.api.IMdmClearJobSubmitter;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IDeleteExpungeJobSubmitter;
import ca.uhn.fhir.rest.api.server.storage.IReindexJobSubmitter;
import ca.uhn.fhir.rest.server.interceptor.ResponseTerminologyTranslationInterceptor;
import ca.uhn.fhir.rest.server.interceptor.consent.IConsentContextServices;
import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor;
import ca.uhn.fhir.rest.server.provider.DeleteExpungeProvider;
import ca.uhn.fhir.rest.server.provider.ReindexProvider;
import ca.uhn.fhir.util.ThreadPoolUtil;
import org.hl7.fhir.common.hapi.validation.support.UnknownCodeSystemWarningValidationSupport;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -463,24 +462,12 @@ public class JpaConfig {
return new PartitionedUrlValidator();
}
@Bean
@Lazy
public IReindexJobSubmitter myReindexJobSubmitter() {
return new ReindexJobSubmitterImpl();
}
@Bean
@Lazy
public DeleteExpungeProvider deleteExpungeProvider(FhirContext theFhirContext, IDeleteExpungeJobSubmitter theDeleteExpungeJobSubmitter) {
return new DeleteExpungeProvider(theFhirContext, theDeleteExpungeJobSubmitter);
}
@Bean
@Lazy
public ReindexProvider reindexProvider(FhirContext theFhirContext, IReindexJobSubmitter theReindexJobSubmitter) {
return new ReindexProvider(theFhirContext, theReindexJobSubmitter);
}
@Bean
@Lazy
public IBulkDataImportSvc bulkDataImportSvc() {
@ -517,6 +504,10 @@ public class JpaConfig {
return new ResourceVersionSvcDaoImpl();
}
@Bean
public IResourceReindexSvc resourceReindexSvc() {
return new ResourceReindexSvcImpl();
}
/* **************************************************************** *
* Prototype Beans Below *

View File

@ -1139,7 +1139,7 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
} catch (Exception e) {
StringBuilder b = new StringBuilder();
b.append("Failed to parse database resource[");
b.append(resourceType);
b.append(myFhirContext.getResourceType(resourceType));
b.append("/");
b.append(theEntity.getIdDt().getIdPart());
b.append(" (pid ");

View File

@ -59,11 +59,11 @@ import ca.uhn.fhir.jpa.search.reindex.IResourceReindexingSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.ResourceSearch;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.searchparam.extractor.ResourceIndexedSearchParams;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.model.dstu2.resource.ListResource;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.EncodingEnum;
@ -86,6 +86,7 @@ import ca.uhn.fhir.rest.param.HasParam;
import ca.uhn.fhir.rest.server.IPagingProvider;
import ca.uhn.fhir.rest.server.IRestfulServerDefaults;
import ca.uhn.fhir.rest.server.RestfulServerUtils;
import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.MethodNotAllowedException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
@ -333,10 +334,10 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
if (resourceHadIdBeforeStorage) {
if (resourceIdWasServerAssigned) {
boolean createForPureNumericIds = true;
createForcedIdIfNeeded(entity, resourceIdBeforeStorage, createForPureNumericIds, persistentId, theRequestPartitionId);
createForcedIdIfNeeded(entity, resourceIdBeforeStorage, createForPureNumericIds);
} else {
boolean createForPureNumericIds = getConfig().getResourceClientIdStrategy() != DaoConfig.ClientIdStrategyEnum.ALPHANUMERIC;
createForcedIdIfNeeded(entity, resourceIdBeforeStorage, createForPureNumericIds, persistentId, theRequestPartitionId);
createForcedIdIfNeeded(entity, resourceIdBeforeStorage, createForPureNumericIds);
}
} else {
switch (getConfig().getResourceClientIdStrategy()) {
@ -345,7 +346,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
break;
case ANY:
boolean createForPureNumericIds = true;
createForcedIdIfNeeded(updatedEntity, theResource.getIdElement().getIdPart(), createForPureNumericIds, persistentId, theRequestPartitionId);
createForcedIdIfNeeded(updatedEntity, theResource.getIdElement().getIdPart(), createForPureNumericIds);
// for client ID mode ANY, we will always have a forced ID. If we ever
// stop populating the transient forced ID be warned that we use it
// (and expect it to be set correctly) farther below.
@ -399,7 +400,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return outcome;
}
private void createForcedIdIfNeeded(ResourceTable theEntity, String theResourceId, boolean theCreateForPureNumericIds, ResourcePersistentId thePersistentId, RequestPartitionId theRequestPartitionId) {
private void createForcedIdIfNeeded(ResourceTable theEntity, String theResourceId, boolean theCreateForPureNumericIds) {
if (isNotBlank(theResourceId) && theEntity.getForcedId() == null) {
if (theCreateForPureNumericIds || !IdHelperService.isValidPid(theResourceId)) {
ForcedId forcedId = new ForcedId();
@ -1288,6 +1289,25 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return Long.toString(readEntity(theReferenceElement.toVersionless(), null).getVersion());
}
@SuppressWarnings("unchecked")
@Override
public void reindex(ResourcePersistentId theResourcePersistentId, RequestDetails theRequest, TransactionDetails theTransactionDetails) {
Optional<ResourceTable> entityOpt = myResourceTableDao.findById(theResourcePersistentId.getIdAsLong());
if (!entityOpt.isPresent()) {
ourLog.warn("Unable to find entity with PID: {}", theResourcePersistentId.getId());
return;
}
ResourceTable entity = entityOpt.get();
try {
T resource = (T) toResource(entity, false);
reindex(resource, entity);
} catch (BaseServerResponseException | DataFormatException e) {
myResourceTableDao.updateIndexStatus(entity.getId(), INDEX_STATUS_INDEXING_FAILED);
throw e;
}
}
@Override
@Transactional
public BaseHasResource readEntity(IIdType theId, boolean theCheckForForcedId, RequestDetails theRequest) {
@ -1569,7 +1589,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
}
@Override
public Set<ResourcePersistentId> searchForIds(SearchParameterMap theParams, RequestDetails theRequest, @Nullable IBaseResource theConditionalOperationTargetOrNull) {
public List<ResourcePersistentId> searchForIds(SearchParameterMap theParams, RequestDetails theRequest, @Nullable IBaseResource theConditionalOperationTargetOrNull) {
TransactionDetails transactionDetails = new TransactionDetails();
return myTransactionService.execute(theRequest, transactionDetails, tx -> {
@ -1582,7 +1602,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
ISearchBuilder builder = mySearchBuilderFactory.newSearchBuilder(this, getResourceName(), getResourceType());
HashSet<ResourcePersistentId> retVal = new HashSet<>();
List<ResourcePersistentId> ids = new ArrayList<>();
String uuid = UUID.randomUUID().toString();
RequestPartitionId requestPartitionId = myRequestPartitionHelperService.determineReadPartitionForRequestForSearchType(theRequest, getResourceName(), theParams, theConditionalOperationTargetOrNull);
@ -1590,13 +1610,13 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
SearchRuntimeDetails searchRuntimeDetails = new SearchRuntimeDetails(theRequest, uuid);
try (IResultIterator iter = builder.createQuery(theParams, searchRuntimeDetails, theRequest, requestPartitionId)) {
while (iter.hasNext()) {
retVal.add(iter.next());
ids.add(iter.next());
}
} catch (IOException e) {
ourLog.error("IO failure during database access", e);
}
return retVal;
return ids;
});
}
@ -1929,10 +1949,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
myIdHelperService = theIdHelperService;
}
private static ResourceIndexedSearchParams toResourceIndexedSearchParams(ResourceTable theEntity) {
return new ResourceIndexedSearchParams(theEntity);
}
private static class IdChecker implements IValidatorModule {
private final ValidationModeEnum myMode;

View File

@ -180,7 +180,7 @@ public class FhirResourceDaoValueSetDstu2 extends BaseHapiFhirResourceDao<ValueS
}
List<IIdType> valueSetIds;
Set<ResourcePersistentId> ids = searchForIds(new SearchParameterMap(ValueSet.SP_CODE, new TokenParam(theSystem, theCode)), theRequest);
List<ResourcePersistentId> ids = searchForIds(new SearchParameterMap(ValueSet.SP_CODE, new TokenParam(theSystem, theCode)), theRequest);
valueSetIds = new ArrayList<>();
for (ResourcePersistentId next : ids) {
IIdType id = myIdHelperService.translatePidIdToForcedId(myFhirContext, "ValueSet", next);

View File

@ -51,4 +51,8 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Modifying
@Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")
void deleteAllForInstance(@Param("instanceId") String theInstanceId);
@Modifying
@Query("UPDATE Batch2WorkChunkEntity e SET e.myErrorCount = e.myErrorCount + :by WHERE e.myId = :id")
void incrementWorkChunkErrorCount(@Param("id") String theChunkId, @Param("by") int theIncrementBy);
}

View File

@ -57,6 +57,24 @@ public interface IResourceTableDao extends JpaRepository<ResourceTable, Long>, I
@Query("SELECT t.myId FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high ORDER BY t.myUpdated ASC")
Slice<Long> findIdsOfResourcesWithinUpdatedRangeOrderedFromOldest(Pageable thePage, @Param("low") Date theLow, @Param("high") Date theHigh);
/**
* @return List of arrays containing [PID, resourceType, lastUpdated]
*/
@Query("SELECT t.myId, t.myResourceType, t.myUpdated FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high ORDER BY t.myUpdated ASC")
Slice<Object[]> findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest(Pageable thePage, @Param("low") Date theLow, @Param("high") Date theHigh);
/**
* @return List of arrays containing [PID, resourceType, lastUpdated]
*/
@Query("SELECT t.myId, t.myResourceType, t.myUpdated FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high AND t.myPartitionIdValue IN (:partition_ids) ORDER BY t.myUpdated ASC")
Slice<Object[]> findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForPartitionIds(Pageable thePage, @Param("low") Date theLow, @Param("high") Date theHigh, @Param("partition_ids") List<Integer> theRequestPartitionIds);
/**
* @return List of arrays containing [PID, resourceType, lastUpdated]
*/
@Query("SELECT t.myId, t.myResourceType, t.myUpdated FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high ORDER BY t.myUpdated ASC")
Slice<Object[]> findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition(Pageable thePage, @Param("low") Date theLow, @Param("high") Date theHigh);
// TODO in the future, consider sorting by pid as well so batch jobs process in the same order across restarts
@Query("SELECT t.myId FROM ResourceTable t WHERE t.myUpdated >= :low AND t.myUpdated <= :high AND t.myPartitionIdValue = :partition_id ORDER BY t.myUpdated ASC")
Slice<Long> findIdsOfPartitionedResourcesWithinUpdatedRangeOrderedFromOldest(Pageable thePage, @Param("low") Date theLow, @Param("high") Date theHigh, @Param("partition_id") Integer theRequestPartitionId);

View File

@ -76,7 +76,7 @@ public class FhirResourceDaoCodeSystemDstu3 extends BaseHapiFhirResourceDao<Code
@Override
public List<IIdType> findCodeSystemIdsContainingSystemAndCode(String theCode, String theSystem, RequestDetails theRequest) {
Set<ResourcePersistentId> ids = searchForIds(new SearchParameterMap(CodeSystem.SP_CODE, new TokenParam(theSystem, theCode)), theRequest);
List<ResourcePersistentId> ids = searchForIds(new SearchParameterMap(CodeSystem.SP_CODE, new TokenParam(theSystem, theCode)), theRequest);
List<IIdType> valueSetIds = new ArrayList<>();
for (ResourcePersistentId next : ids) {
IIdType id = myIdHelperService.translatePidIdToForcedId(myFhirContext, "CodeSystem", next);

View File

@ -74,7 +74,7 @@ public class FhirResourceDaoCodeSystemR4 extends BaseHapiFhirResourceDao<CodeSys
@Override
public List<IIdType> findCodeSystemIdsContainingSystemAndCode(String theCode, String theSystem, RequestDetails theRequest) {
List<IIdType> valueSetIds;
Set<ResourcePersistentId> ids = searchForIds(new SearchParameterMap(CodeSystem.SP_CODE, new TokenParam(theSystem, theCode)), theRequest);
List<ResourcePersistentId> ids = searchForIds(new SearchParameterMap(CodeSystem.SP_CODE, new TokenParam(theSystem, theCode)), theRequest);
valueSetIds = new ArrayList<>();
for (ResourcePersistentId next : ids) {
IIdType id = myIdHelperService.translatePidIdToForcedId(myFhirContext, "CodeSystem", next);

View File

@ -75,7 +75,7 @@ public class FhirResourceDaoCodeSystemR5 extends BaseHapiFhirResourceDao<CodeSys
@Override
public List<IIdType> findCodeSystemIdsContainingSystemAndCode(String theCode, String theSystem, RequestDetails theRequest) {
List<IIdType> valueSetIds;
Set<ResourcePersistentId> ids = searchForIds(new SearchParameterMap(org.hl7.fhir.r4.model.CodeSystem.SP_CODE, new TokenParam(theSystem, theCode)), theRequest);
List<ResourcePersistentId> ids = searchForIds(new SearchParameterMap(org.hl7.fhir.r4.model.CodeSystem.SP_CODE, new TokenParam(theSystem, theCode)), theRequest);
valueSetIds = new ArrayList<>();
for (ResourcePersistentId next : ids) {
IIdType id = myIdHelperService.translatePidIdToForcedId(myFhirContext, "CodeSystem", next);

View File

@ -272,6 +272,11 @@ public class HapiFhirJpaMigrationTasks extends BaseMigrationTasks<VersionEnum> {
replaceNumericSPIndices(version);
replaceQuantitySPIndices(version);
// Drop Index on HFJ_RESOURCE.INDEX_STATUS
version
.onTable("HFJ_RESOURCE")
.dropIndex("20220314.1", "IDX_INDEXSTATUS");
}
/**

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.jpa.provider;
* #L%
*/
import ca.uhn.fhir.batch2.jobs.reindex.ReindexProvider;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.model.api.annotation.Description;
import ca.uhn.fhir.rest.annotation.Operation;
@ -45,7 +46,7 @@ public abstract class BaseJpaSystemProviderDstu2Plus<T, MT> extends BaseJpaSyste
})
/**
* @deprecated
* @see ca.uhn.fhir.rest.server.provider.ReindexProvider#Reindex(List, IPrimitiveType, RequestDetails)
* @see ReindexProvider#Reindex(List, IPrimitiveType, RequestDetails)
*/
@Deprecated
public IBaseResource markAllResourcesForReindexing(
@ -72,7 +73,7 @@ public abstract class BaseJpaSystemProviderDstu2Plus<T, MT> extends BaseJpaSyste
})
/**
* @deprecated
* @see ca.uhn.fhir.rest.server.provider.ReindexProvider#Reindex(List, IPrimitiveType, RequestDetails)
* @see ReindexProvider#Reindex(List, IPrimitiveType, RequestDetails)
*/
@Deprecated
public IBaseResource performReindexingPass() {

View File

@ -1,108 +0,0 @@
package ca.uhn.fhir.jpa.reindex;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.batch.job.PartitionedUrlValidator;
import ca.uhn.fhir.jpa.batch.job.model.RequestListJson;
import ca.uhn.fhir.jpa.batch.reader.CronologicalBatchAllResourcePidReader;
import ca.uhn.fhir.jpa.batch.reader.ReverseCronologicalBatchResourcePidReader;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IReindexJobSubmitter;
import ca.uhn.fhir.rest.server.exceptions.ForbiddenOperationException;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import javax.transaction.Transactional;
import java.util.List;
public class ReindexJobSubmitterImpl implements IReindexJobSubmitter {
@Autowired
PartitionedUrlValidator myPartitionedUrlValidator;
@Autowired
DaoConfig myDaoConfig;
@Autowired
private ISearchParamRegistry mySearchParamRegistry;
@Autowired
private IBatchJobSubmitter myBatchJobSubmitter;
@Autowired
@Qualifier(BatchConstants.REINDEX_JOB_NAME)
private Job myReindexJob;
@Autowired
@Qualifier(BatchConstants.REINDEX_EVERYTHING_JOB_NAME)
private Job myReindexEverythingJob;
@Override
@Transactional(Transactional.TxType.NEVER)
public JobExecution submitJob(Integer theBatchSize, List<String> theUrlsToReindex, RequestDetails theRequest) throws JobParametersInvalidException {
if (theBatchSize == null) {
theBatchSize = myDaoConfig.getReindexBatchSize();
}
RequestListJson requestListJson = myPartitionedUrlValidator.buildRequestListJson(theRequest, theUrlsToReindex);
if (!myDaoConfig.isReindexEnabled()) {
throw new ForbiddenOperationException(Msg.code(1273) + "Reindexing is disabled on this server.");
}
/*
* On the first time we run a particular reindex job, let's make sure we
* have the latest search parameters loaded. A common reason to
* be reindexing is that the search parameters have changed in some way, so
* this makes sure we're on the latest versions
*/
mySearchParamRegistry.forceRefresh();
JobParameters jobParameters = ReverseCronologicalBatchResourcePidReader.buildJobParameters(ProviderConstants.OPERATION_REINDEX, theBatchSize, requestListJson);
return myBatchJobSubmitter.runJob(myReindexJob, jobParameters);
}
@Override
@Transactional(Transactional.TxType.NEVER)
public JobExecution submitEverythingJob(Integer theBatchSize, RequestDetails theRequest) throws JobParametersInvalidException {
if (theBatchSize == null) {
theBatchSize = myDaoConfig.getReindexBatchSize();
}
RequestPartitionId requestPartitionId = myPartitionedUrlValidator.requestPartitionIdFromRequest(theRequest);
if (!myDaoConfig.isReindexEnabled()) {
throw new ForbiddenOperationException(Msg.code(1274) + "Reindexing is disabled on this server.");
}
/*
* On the first time we run a particular reindex job, let's make sure we
* have the latest search parameters loaded. A common reason to
* be reindexing is that the search parameters have changed in some way, so
* this makes sure we're on the latest versions
*/
mySearchParamRegistry.forceRefresh();
JobParameters jobParameters = CronologicalBatchAllResourcePidReader.buildJobParameters(theBatchSize, requestPartitionId);
return myBatchJobSubmitter.runJob(myReindexEverythingJob, jobParameters);
}
}

View File

@ -0,0 +1,141 @@
package ca.uhn.fhir.jpa.reindex;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
public class ResourceReindexSvcImpl implements IResourceReindexSvc {
@Autowired
private IResourceTableDao myResourceTableDao;
@Autowired
private MatchUrlService myMatchUrlService;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myFhirContext;
@Override
public boolean isAllResourceTypeSupported() {
return true;
}
@Override
@Transactional
public IdChunk fetchResourceIdsPage(Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) {
int pageSize = 20000;
if (theUrl == null) {
return fetchResourceIdsPageNoUrl(theStart, theEnd, pageSize, theRequestPartitionId);
} else {
return fetchResourceIdsPageWithUrl(theStart, theEnd, pageSize, theUrl, theRequestPartitionId);
}
}
private IdChunk fetchResourceIdsPageWithUrl(Date theStart, Date theEnd, int thePageSize, String theUrl, RequestPartitionId theRequestPartitionId) {
String resourceType = theUrl.substring(0, theUrl.indexOf('?'));
RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(resourceType);
SearchParameterMap searchParamMap = myMatchUrlService.translateMatchUrl(theUrl, def);
searchParamMap.setSort(new SortSpec(Constants.PARAM_LASTUPDATED, SortOrderEnum.ASC));
searchParamMap.setLastUpdated(new DateRangeParam(theStart, theEnd));
searchParamMap.setCount(thePageSize);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(resourceType);
SystemRequestDetails request = new SystemRequestDetails();
request.setRequestPartitionId(theRequestPartitionId);
List<ResourcePersistentId> ids = dao.searchForIds(searchParamMap, request);
// just a list of the same size where every element is the same resource type
List<String> resourceTypes = ids
.stream()
.map(t -> resourceType)
.collect(Collectors.toList());
Date lastDate = null;
if (ids.size() > 0) {
lastDate = dao.readByPid(ids.get(ids.size() - 1)).getMeta().getLastUpdated();
}
return new IdChunk(ids, resourceTypes, lastDate);
}
@Nonnull
private IdChunk fetchResourceIdsPageNoUrl(Date theStart, Date theEnd, int thePagesize, RequestPartitionId theRequestPartitionId) {
Pageable page = Pageable.ofSize(thePagesize);
Slice<Object[]> slice;
if (theRequestPartitionId == null || theRequestPartitionId.isAllPartitions()) {
slice = myResourceTableDao.findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldest(page, theStart, theEnd);
} else if (theRequestPartitionId.isDefaultPartition()) {
slice = myResourceTableDao.findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForDefaultPartition(page, theStart, theEnd);
} else {
slice = myResourceTableDao.findIdsTypesAndUpdateTimesOfResourcesWithinUpdatedRangeOrderedFromOldestForPartitionIds(page, theStart, theEnd, theRequestPartitionId.getPartitionIds());
}
List<Object[]> content = slice.getContent();
if (content.isEmpty()) {
return new IdChunk(Collections.emptyList(), Collections.emptyList(), null);
}
List<ResourcePersistentId> ids = content
.stream()
.map(t -> new ResourcePersistentId(t[0]))
.collect(Collectors.toList());
List<String> types = content
.stream()
.map(t -> (String) t[1])
.collect(Collectors.toList());
Date lastDate = (Date) content.get(content.size() - 1)[2];
return new IdChunk(ids, types, lastDate);
}
}

View File

@ -1,90 +0,0 @@
package ca.uhn.fhir.jpa.reindex.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.listener.PidReaderCounterListener;
import ca.uhn.fhir.jpa.batch.reader.CronologicalBatchAllResourcePidReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import java.util.List;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.REINDEX_EVERYTHING_JOB_NAME;
/**
* Spring batch Job configuration file. Contains all necessary plumbing to run a
* Reindex job.
*/
@Configuration
public class ReindexEverythingJobConfig {
public static final String REINDEX_EVERYTHING_STEP_NAME = "reindex-everything-step";
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private ReindexWriter myReindexWriter;
@Autowired
private PidReaderCounterListener myPidCountRecorderListener;
@Bean(name = REINDEX_EVERYTHING_JOB_NAME)
@Lazy
public Job reindexJob() {
return myJobBuilderFactory.get(REINDEX_EVERYTHING_JOB_NAME)
.start(reindexEverythingStep())
.build();
}
@Bean
public Step reindexEverythingStep() {
return myStepBuilderFactory.get(REINDEX_EVERYTHING_STEP_NAME)
.<List<Long>, List<Long>>chunk(1)
.reader(cronologicalBatchAllResourcePidReader())
.writer(myReindexWriter)
.listener(myPidCountRecorderListener)
.listener(reindexEverythingPromotionListener())
.build();
}
@Bean
@StepScope
public CronologicalBatchAllResourcePidReader cronologicalBatchAllResourcePidReader() {
return new CronologicalBatchAllResourcePidReader();
}
@Bean
public ExecutionContextPromotionListener reindexEverythingPromotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{PidReaderCounterListener.RESOURCE_TOTAL_PROCESSED});
return listener;
}
}

View File

@ -1,92 +0,0 @@
package ca.uhn.fhir.jpa.reindex.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.batch.job.MultiUrlJobParameterValidator;
import ca.uhn.fhir.jpa.batch.listener.PidReaderCounterListener;
import ca.uhn.fhir.jpa.batch.reader.ReverseCronologicalBatchResourcePidReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import java.util.List;
import static ca.uhn.fhir.jpa.batch.config.BatchConstants.REINDEX_JOB_NAME;
/**
* Spring batch Job configuration file. Contains all necessary plumbing to run a
* Reindex job.
*/
@Configuration
public class ReindexJobConfig {
public static final String REINDEX_URL_LIST_STEP_NAME = "reindex-url-list-step";
@Autowired
private StepBuilderFactory myStepBuilderFactory;
@Autowired
private JobBuilderFactory myJobBuilderFactory;
@Autowired
private ReindexWriter myReindexWriter;
@Autowired
private MultiUrlJobParameterValidator myMultiUrlProcessorParameterValidator;
@Autowired
private PidReaderCounterListener myPidCountRecorderListener;
@Autowired
private ReverseCronologicalBatchResourcePidReader myReverseCronologicalBatchResourcePidReader;
@Bean(name = REINDEX_JOB_NAME)
@Lazy
public Job reindexJob() {
return myJobBuilderFactory.get(REINDEX_JOB_NAME)
.validator(myMultiUrlProcessorParameterValidator)
.start(reindexUrlListStep())
.build();
}
@Bean
public Step reindexUrlListStep() {
return myStepBuilderFactory.get(REINDEX_URL_LIST_STEP_NAME)
.<List<Long>, List<Long>>chunk(1)
.reader(myReverseCronologicalBatchResourcePidReader)
.writer(myReindexWriter)
.listener(myPidCountRecorderListener)
.listener(reindexPromotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener reindexPromotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{PidReaderCounterListener.RESOURCE_TOTAL_PROCESSED});
return listener;
}
}

View File

@ -1,67 +0,0 @@
package ca.uhn.fhir.jpa.reindex.job;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.dao.expunge.PartitionRunner;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.SliceImpl;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.util.List;
/**
* Reindex the provided list of pids of resources
*/
public class ReindexWriter implements ItemWriter<List<Long>> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexWriter.class);
public static final String PROCESS_NAME = "Reindexing";
public static final String THREAD_PREFIX = "reindex";
@Autowired
ResourceReindexer myResourceReindexer;
@Autowired
DaoConfig myDaoConfig;
@Autowired
protected PlatformTransactionManager myTxManager;
@Override
public void write(List<? extends List<Long>> thePidLists) throws Exception {
PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getReindexBatchSize(), myDaoConfig.getReindexThreadCount());
// Note that since our chunk size is 1, there will always be exactly one list
for (List<Long> pidList : thePidLists) {
partitionRunner.runInPartitionedThreads(new SliceImpl<>(pidList), pids -> reindexPids(pids));
}
}
private void reindexPids(List<Long> pidList) {
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.executeWithoutResult(t -> pidList.forEach(pid -> myResourceReindexer.readAndReindexResourceByPid(pid)));
}
}

View File

@ -1,19 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

View File

@ -1,36 +0,0 @@
package ca.uhn.fhir.jpa.util;
/*
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
public class ReindexFailureException extends RuntimeException {
private static final long serialVersionUID = 1L;
private Long myResourceId;
public ReindexFailureException(Long theResourceId) {
myResourceId = theResourceId;
}
public Long getResourceId() {
return myResourceId;
}
}

View File

@ -260,6 +260,27 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
}
@Test
public void testIncrementWorkChunkErrorCount() {
// Setup
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String chunkId = mySvc.storeWorkChunk("definition-chunkId", 1, "step-chunkId", instanceId, 1, null);
assertNotNull(chunkId);
// Execute
mySvc.incrementWorkChunkErrorCount(chunkId, 2);
mySvc.incrementWorkChunkErrorCount(chunkId, 3);
// Verify
List<WorkChunk> chunks = mySvc.fetchWorkChunksWithoutData(instanceId, 100, 0);
assertEquals(1, chunks.size());
assertEquals(5, chunks.get(0).getErrorCount());
}
@Test
public void testMarkChunkAsCompleted_Error() {
JobInstance instance = createInstance();

View File

@ -2,7 +2,7 @@ package ca.uhn.fhir.jpa.bulk.imprt2;
import ca.uhn.fhir.batch2.api.IJobCleanerService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.imprt.BulkImport2AppCtx;
import ca.uhn.fhir.batch2.jobs.imprt.BulkImportAppCtx;
import ca.uhn.fhir.batch2.jobs.imprt.BulkImportFileServlet;
import ca.uhn.fhir.batch2.jobs.imprt.BulkImportJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
@ -87,7 +87,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
}
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(BulkImport2AppCtx.JOB_BULK_IMPORT_PULL);
request.setJobDefinitionId(BulkImportAppCtx.JOB_BULK_IMPORT_PULL);
request.setParameters(parameters);
// Execute
@ -134,7 +134,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
}
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(BulkImport2AppCtx.JOB_BULK_IMPORT_PULL);
request.setJobDefinitionId(BulkImportAppCtx.JOB_BULK_IMPORT_PULL);
request.setParameters(parameters);
IAnonymousInterceptor anonymousInterceptor = (thePointcut, theArgs) -> {
@ -212,7 +212,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
}
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(BulkImport2AppCtx.JOB_BULK_IMPORT_PULL);
request.setJobDefinitionId(BulkImportAppCtx.JOB_BULK_IMPORT_PULL);
request.setParameters(parameters);
// Execute
@ -249,7 +249,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
parameters.addNdJsonUrl(url);
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(BulkImport2AppCtx.JOB_BULK_IMPORT_PULL);
request.setJobDefinitionId(BulkImportAppCtx.JOB_BULK_IMPORT_PULL);
request.setParameters(parameters);
IAnonymousInterceptor anonymousInterceptor = (thePointcut, theArgs) -> {
@ -294,7 +294,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
// Setup
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(BulkImport2AppCtx.JOB_BULK_IMPORT_PULL);
request.setJobDefinitionId(BulkImportAppCtx.JOB_BULK_IMPORT_PULL);
// Execute
@ -314,7 +314,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
// Setup
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(BulkImport2AppCtx.JOB_BULK_IMPORT_PULL);
request.setJobDefinitionId(BulkImportAppCtx.JOB_BULK_IMPORT_PULL);
request.setParameters(new BulkImportJobParameters());
// Execute
@ -325,7 +325,10 @@ public class BulkImportR4Test extends BaseJpaR4Test {
} catch (InvalidRequestException e) {
// Verify
assertEquals("HAPI-2039: Failed to validate parameters for job of type BULK_IMPORT_PULL: [myNdJsonUrls At least one NDJSON URL must be provided]", e.getMessage());
String expected = """
HAPI-2039: Failed to validate parameters for job of type BULK_IMPORT_PULL:\s
* myNdJsonUrls - At least one NDJSON URL must be provided""";
assertEquals(expected, e.getMessage());
}
}
@ -338,7 +341,7 @@ public class BulkImportR4Test extends BaseJpaR4Test {
parameters.addNdJsonUrl("foo");
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(BulkImport2AppCtx.JOB_BULK_IMPORT_PULL);
request.setJobDefinitionId(BulkImportAppCtx.JOB_BULK_IMPORT_PULL);
request.setParameters(parameters);
// Execute
@ -349,7 +352,10 @@ public class BulkImportR4Test extends BaseJpaR4Test {
} catch (InvalidRequestException e) {
// Verify
assertEquals("HAPI-2039: Failed to validate parameters for job of type BULK_IMPORT_PULL: [myNdJsonUrls[0].<list element> Must be a valid URL]", e.getMessage());
String expected = """
HAPI-2039: Failed to validate parameters for job of type BULK_IMPORT_PULL:\s
* myNdJsonUrls[0].<list element> - Must be a valid URL""";
assertEquals(expected, e.getMessage());
}
}

View File

@ -1,6 +1,8 @@
package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.batch2.jobs.config.Batch2JobsConfig;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.batch2.JpaBatch2Config;
import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil;
import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect;
import ca.uhn.fhir.jpa.util.CircularQueueCaptureQueriesListener;
@ -34,6 +36,8 @@ import static org.junit.jupiter.api.Assertions.fail;
JpaDstu2Config.class,
HapiJpaConfig.class,
TestJPAConfig.class,
JpaBatch2Config.class,
Batch2JobsConfig.class,
TestHibernateSearchAddInConfig.DefaultLuceneHeap.class
})
public class TestDstu2Config {

View File

@ -1,6 +1,8 @@
package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.batch2.jobs.config.Batch2JobsConfig;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.batch2.JpaBatch2Config;
import ca.uhn.fhir.jpa.config.dstu3.JpaDstu3Config;
import ca.uhn.fhir.jpa.config.util.HapiEntityManagerFactoryUtil;
import ca.uhn.fhir.jpa.model.dialect.HapiFhirH2Dialect;
@ -38,6 +40,8 @@ import static org.junit.jupiter.api.Assertions.fail;
JpaDstu3Config.class,
HapiJpaConfig.class,
TestJPAConfig.class,
JpaBatch2Config.class,
Batch2JobsConfig.class,
TestHibernateSearchAddInConfig.DefaultLuceneHeap.class
})
public class TestDstu3Config {

View File

@ -10,6 +10,7 @@ import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
import ca.uhn.fhir.jpa.subscription.match.deliver.resthook.SubscriptionDeliveringRestHookSubscriber;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import ca.uhn.fhir.jpa.util.Batch2JobHelper;
import ca.uhn.fhir.test.utilities.BatchJobHelper;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.context.annotation.Bean;
@ -79,6 +80,11 @@ public class TestJPAConfig {
return new BatchJobHelper(theJobExplorer);
}
@Bean
public Batch2JobHelper batch2JobHelper() {
return new Batch2JobHelper();
}
@Bean
@Lazy
public IBinaryStorageSvc binaryStorage() {

View File

@ -1,6 +1,8 @@
package ca.uhn.fhir.jpa.config;
import ca.uhn.fhir.batch2.jobs.config.Batch2JobsConfig;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.batch2.JpaBatch2Config;
import ca.uhn.fhir.jpa.binstore.IBinaryStorageSvc;
import ca.uhn.fhir.jpa.binstore.MemoryBinaryStorageSvcImpl;
import ca.uhn.fhir.jpa.config.r5.JpaR5Config;
@ -34,6 +36,8 @@ import static org.junit.jupiter.api.Assertions.fail;
JpaR5Config.class,
HapiJpaConfig.class,
TestJPAConfig.class,
JpaBatch2Config.class,
Batch2JobsConfig.class,
TestHibernateSearchAddInConfig.DefaultLuceneHeap.class
})
public class TestR5Config {

View File

@ -56,6 +56,7 @@ import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.rest.server.util.ISearchParamRegistry;
import ca.uhn.fhir.test.BaseTest;
import ca.uhn.fhir.jpa.util.Batch2JobHelper;
import ca.uhn.fhir.test.utilities.BatchJobHelper;
import ca.uhn.fhir.test.utilities.LoggingExtension;
import ca.uhn.fhir.test.utilities.ProxyUtil;
@ -183,6 +184,8 @@ public abstract class BaseJpaTest extends BaseTest {
protected IFulltextSearchSvc myFulltestSearchSvc;
@Autowired(required = false)
protected BatchJobHelper myBatchJobHelper;
@Autowired(required = false)
protected Batch2JobHelper myBatch2JobHelper;
@Autowired
protected ITermConceptDao myTermConceptDao;
@Autowired

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.dao.dstu2;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.entity.ModelConfig;
@ -30,8 +29,6 @@ import ca.uhn.fhir.model.primitive.DecimalDt;
import ca.uhn.fhir.model.primitive.IntegerDt;
import ca.uhn.fhir.model.primitive.StringDt;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
import ca.uhn.fhir.rest.param.DateParam;
import ca.uhn.fhir.rest.param.NumberParam;
import ca.uhn.fhir.rest.param.ReferenceParam;
@ -40,13 +37,11 @@ import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.internal.util.collections.ListUtil;
import java.util.List;
@ -55,7 +50,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

View File

@ -85,7 +85,6 @@ import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@ -101,7 +100,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
@SuppressWarnings("unchecked")
@ContextConfiguration(classes= TestHibernateSearchAddInConfig.NoFT.class)
@ContextConfiguration(classes = TestHibernateSearchAddInConfig.NoFT.class)
public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoDstu2SearchNoFtTest.class);
@Autowired
@ -154,7 +153,7 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test {
IIdType moId = myMedicationOrderDao.create(mo, mySrd).getId().toUnqualifiedVersionless();
HttpServletRequest request = mock(HttpServletRequest.class);
IBundleProvider resp = myPatientDao.patientTypeEverything(request, null, null, null, null, null, null, null, mySrd, null);
IBundleProvider resp = myPatientDao.patientTypeEverything(request, null, null, null, null, null, null, null, mySrd, null);
assertThat(toUnqualifiedVersionlessIds(resp), containsInAnyOrder(orgId, medId, patId, moId, patId2));
request = mock(HttpServletRequest.class);
@ -1588,7 +1587,7 @@ public class FhirResourceDaoDstu2SearchNoFtTest extends BaseJpaDstu2Test {
String methodName = "testSearchValueQuantity";
QuantityParam param;
Set<ResourcePersistentId> found;
List<ResourcePersistentId> found;
param = new QuantityParam(ParamPrefixEnum.GREATERTHAN_OR_EQUALS, new BigDecimal("10"), null, null);
found = myObservationDao.searchForIds(new SearchParameterMap("value-quantity", param), null);
int initialSize = found.size();

View File

@ -290,7 +290,7 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
assertThat(toUnqualifiedVersionlessIdValues(found), hasItem(id2.getValue()));
}
{
Set<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2016-01-02")), null);
List<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2016-01-02")), null);
assertThat(ResourcePersistentId.toLongList(found), not(hasItem(id2.getIdPartAsLong())));
}
}
@ -1507,7 +1507,7 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
"}\n";
//@formatter:on
Set<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
List<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
int initial = val.size();
Organization org = myFhirContext.newJsonParser().parseResource(Organization.class, inputStr);
@ -2698,7 +2698,7 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
assertThat(str.length(), greaterThan(ResourceIndexedSearchParamString.MAX_LENGTH));
Set<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
List<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
int initial = val.size();
myOrganizationDao.create(org, mySrd);
@ -2813,7 +2813,7 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
String subStr1 = longStr1.substring(0, ResourceIndexedSearchParamString.MAX_LENGTH);
String subStr2 = longStr2.substring(0, ResourceIndexedSearchParamString.MAX_LENGTH);
Set<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("type", new IdentifierDt(subStr1, subStr2)), null);
List<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("type", new IdentifierDt(subStr1, subStr2)), null);
int initial = val.size();
myOrganizationDao.create(org, mySrd);
@ -2840,12 +2840,12 @@ public class FhirResourceDaoDstu2Test extends BaseJpaDstu2Test {
public void testValidateAgainstDstu2Profile() throws Exception {
myDaoConfig.setAllowExternalReferences(true);
String stream = IOUtils.toString(getClass().getResourceAsStream("/binu_testpatient_structuredefinition_dstu2.xml"), StandardCharsets.UTF_8);
String stream = loadResource("/binu_testpatient_structuredefinition_dstu2.xml");
StructureDefinition sd = myFhirContext.newXmlParser().parseResource(StructureDefinition.class, stream);
myStructureDefinitionDao.create(sd, mySrd);
String rawResource = IOUtils.toString(getClass().getResourceAsStream("/binu_testpatient_resource.json"), StandardCharsets.UTF_8);
String rawResource = loadResource("/binu_testpatient_resource.json");
IBaseResource parsedResource = myFhirContext.newJsonParser().parseResource(rawResource);
try {
myPatientDao.validate((Patient) parsedResource, null, rawResource, EncodingEnum.JSON, ValidationModeEnum.UPDATE, null, mySrd);

View File

@ -1,7 +1,6 @@
package ca.uhn.fhir.jpa.dao.dstu2;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
import ca.uhn.fhir.model.api.Tag;
@ -14,11 +13,10 @@ import ca.uhn.fhir.model.dstu2.resource.Patient;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.model.primitive.StringDt;
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.util.TestUtil;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@ -41,7 +39,6 @@ public class FhirResourceDaoDstu2UpdateTest extends BaseJpaDstu2Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoDstu2UpdateTest.class);
@Test
public void testUpdateByUrl() {
String methodName = "testUpdateByUrl";
@ -100,7 +97,7 @@ public class FhirResourceDaoDstu2UpdateTest extends BaseJpaDstu2Test {
conformance.setId("");
myConformanceDao.create(conformance);
assertEquals(1, myConformanceDao.search(new SearchParameterMap().setLoadSynchronous(true)).size().intValue());
assertEquals(1, myConformanceDao.search(new SearchParameterMap().setLoadSynchronous(true)).sizeOrThrowNpe());
}
/**
@ -171,7 +168,7 @@ public class FhirResourceDaoDstu2UpdateTest extends BaseJpaDstu2Test {
p2.addName().addFamily("Tester").addGiven("testUpdateMaintainsSearchParamsDstu2BBB");
myPatientDao.create(p2, mySrd);
Set<ResourcePersistentId> ids = myPatientDao.searchForIds(new SearchParameterMap(Patient.SP_GIVEN, new StringDt("testUpdateMaintainsSearchParamsDstu2AAA")), null);
List<ResourcePersistentId> ids = myPatientDao.searchForIds(new SearchParameterMap(Patient.SP_GIVEN, new StringDt("testUpdateMaintainsSearchParamsDstu2AAA")), null);
assertEquals(1, ids.size());
assertThat(ResourcePersistentId.toLongList(ids), contains(p1id.getIdPartAsLong()));

View File

@ -117,7 +117,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings({"unchecked"})
public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(FhirResourceDaoDstu3Test.class);
@ -161,8 +161,8 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
}
private List<String> extractNames(IBundleProvider theSearch) {
ArrayList<String> retVal = new ArrayList<String>();
for (IBaseResource next : theSearch.getResources(0, theSearch.size())) {
ArrayList<String> retVal = new ArrayList<>();
for (IBaseResource next : theSearch.getResources(0, theSearch.sizeOrThrowNpe())) {
Patient nextPt = (Patient) next;
retVal.add(nextPt.getName().get(0).getNameAsSingleString());
}
@ -187,7 +187,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
}
private List<UriType> sortIds(List<UriType> theProfiles) {
ArrayList<UriType> retVal = new ArrayList<UriType>(theProfiles);
ArrayList<UriType> retVal = new ArrayList<>(theProfiles);
retVal.sort(Comparator.comparing(PrimitiveType::getValue));
return retVal;
}
@ -199,7 +199,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
org.setLanguageElement(new CodeType("EN_ca"));
org.setName(methodName);
ArrayList<Coding> tl = new ArrayList<Coding>();
ArrayList<Coding> tl = new ArrayList<>();
tl.add(new Coding().setSystem(methodName).setCode(methodName));
org.getMeta().getTag().addAll(tl);
@ -254,11 +254,11 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
IIdType id2 = myObservationDao.create(o2, mySrd).getId();
{
Set<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2001-01-02")), null);
List<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2001-01-02")), null);
assertThat(ResourcePersistentId.toLongList(found), hasItem(id2.getIdPartAsLong()));
}
{
Set<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2016-01-02")), null);
List<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2016-01-02")), null);
assertThat(ResourcePersistentId.toLongList(found), not(hasItem(id2.getIdPartAsLong())));
}
}
@ -1914,7 +1914,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
"}\n";
//@formatter:on
Set<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
List<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
int initial = val.size();
Organization org = myFhirContext.newJsonParser().parseResource(Organization.class, inputStr);
@ -2533,7 +2533,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
@Test()
public void testSortByComposite() {
IIdType pid0;
IIdType oid1;
IIdType oid2;
@ -2551,54 +2551,54 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
obs.getSubject().setReferenceElement(pid0);
obs.getCode().addCoding().setCode("2345-7").setSystem("http://loinc.org");
obs.setValue(new StringType("200"));
oid1 = myObservationDao.create(obs, mySrd).getId().toUnqualifiedVersionless();
ourLog.info("Observation: \n" + myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(obs));
}
{
Observation obs = new Observation();
obs.addIdentifier().setSystem("urn:system").setValue("FOO");
obs.getSubject().setReferenceElement(pid0);
obs.getCode().addCoding().setCode("2345-7").setSystem("http://loinc.org");
obs.setValue(new StringType("300"));
oid2 = myObservationDao.create(obs, mySrd).getId().toUnqualifiedVersionless();
ourLog.info("Observation: \n" + myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(obs));
}
{
Observation obs = new Observation();
obs.addIdentifier().setSystem("urn:system").setValue("FOO");
obs.getSubject().setReferenceElement(pid0);
obs.getCode().addCoding().setCode("2345-7").setSystem("http://loinc.org");
obs.setValue(new StringType("150"));
oid3 = myObservationDao.create(obs, mySrd).getId().toUnqualifiedVersionless();
ourLog.info("Observation: \n" + myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(obs));
}
{
Observation obs = new Observation();
obs.addIdentifier().setSystem("urn:system").setValue("FOO");
obs.getSubject().setReferenceElement(pid0);
obs.getCode().addCoding().setCode("2345-7").setSystem("http://loinc.org");
obs.setValue(new StringType("250"));
oid4 = myObservationDao.create(obs, mySrd).getId().toUnqualifiedVersionless();
ourLog.info("Observation: \n" + myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(obs));
}
SearchParameterMap pm = new SearchParameterMap();
pm.setSort(new SortSpec(Observation.SP_CODE_VALUE_STRING));
IBundleProvider found = myObservationDao.search(pm);
List<IIdType> list = toUnqualifiedVersionlessIds(found);
assertEquals(4, list.size());
assertEquals(oid3, list.get(0));
@ -3165,7 +3165,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
assertThat(str.length(), greaterThan(ResourceIndexedSearchParamString.MAX_LENGTH));
Set<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
List<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
int initial = val.size();
myOrganizationDao.create(org, mySrd);
@ -3266,8 +3266,8 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
assertEquals("http://profile/1", profiles.get(0).getValue());
assertEquals("http://profile/2", profiles.get(1).getValue());
myPatientDao.metaAddOperation(patientId, new Meta().addTag( "http://foo", "Cat", "Kittens"), null);
myPatientDao.metaAddOperation(patientId, new Meta().addTag( "http://foo", "Cow", "Calves"), null);
myPatientDao.metaAddOperation(patientId, new Meta().addTag("http://foo", "Cat", "Kittens"), null);
myPatientDao.metaAddOperation(patientId, new Meta().addTag("http://foo", "Cow", "Calves"), null);
retrieved = myPatientDao.read(patientId, mySrd);
published = (ArrayList<Coding>) retrieved.getMeta().getTag();
@ -3344,7 +3344,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
String subStr1 = longStr1.substring(0, ResourceIndexedSearchParamString.MAX_LENGTH);
String subStr2 = longStr2.substring(0, ResourceIndexedSearchParamString.MAX_LENGTH);
Set<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("type", new TokenParam(subStr1, subStr2)), null);
List<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("type", new TokenParam(subStr1, subStr2)), null);
int initial = val.size();
myOrganizationDao.create(org, mySrd);
@ -3368,7 +3368,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
}
@Test
public void testUpdateRejectsIdWhichPointsToForcedId() throws InterruptedException {
public void testUpdateRejectsIdWhichPointsToForcedId() {
Patient p1 = new Patient();
p1.addIdentifier().setSystem("urn:system").setValue("testUpdateRejectsIdWhichPointsToForcedId01");
p1.addName().setFamily("Tester").addGiven("testUpdateRejectsIdWhichPointsToForcedId01");
@ -3416,7 +3416,7 @@ public class FhirResourceDaoDstu3Test extends BaseJpaDstu3Test {
}
private static List<String> toStringList(List<UriType> theUriType) {
ArrayList<String> retVal = new ArrayList<String>();
ArrayList<String> retVal = new ArrayList<>();
for (UriType next : theUriType) {
retVal.add(next.getValue());
}

View File

@ -466,7 +466,7 @@ public class FhirResourceDaoDstu3UpdateTest extends BaseJpaDstu3Test {
p2.addName().setFamily("Tester").addGiven("testUpdateMaintainsSearchParamsDstu2BBB");
myPatientDao.create(p2, mySrd).getId();
Set<ResourcePersistentId> ids = myPatientDao.searchForIds(new SearchParameterMap(Patient.SP_GIVEN, new StringParam("testUpdateMaintainsSearchParamsDstu2AAA")), null);
List<ResourcePersistentId> ids = myPatientDao.searchForIds(new SearchParameterMap(Patient.SP_GIVEN, new StringParam("testUpdateMaintainsSearchParamsDstu2AAA")), null);
assertEquals(1, ids.size());
assertThat(ResourcePersistentId.toLongList(ids), contains(p1id.getIdPartAsLong()));

View File

@ -476,11 +476,11 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
IIdType id2 = myObservationDao.create(o2, mySrd).getId();
{
Set<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2001-01-02")), null);
List<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2001-01-02")), null);
assertThat(ResourcePersistentId.toLongList(found), hasItem(id2.getIdPartAsLong()));
}
{
Set<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2016-01-02")), null);
List<ResourcePersistentId> found = myObservationDao.searchForIds(new SearchParameterMap(Observation.SP_DATE, new DateParam(">2016-01-02")), null);
assertThat(ResourcePersistentId.toLongList(found), not(hasItem(id2.getIdPartAsLong())));
}
}
@ -2481,7 +2481,7 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
"}\n";
//@formatter:on
Set<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
List<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
int initial = val.size();
Organization org = myFhirContext.newJsonParser().parseResource(Organization.class, inputStr);
@ -3914,7 +3914,7 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
assertThat(str.length(), greaterThan(ResourceIndexedSearchParamString.MAX_LENGTH));
Set<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
List<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("name", new StringParam("P")), null);
int initial = val.size();
myOrganizationDao.create(org, mySrd);
@ -4093,7 +4093,7 @@ public class FhirResourceDaoR4Test extends BaseJpaR4Test {
String subStr1 = longStr1.substring(0, ResourceIndexedSearchParamString.MAX_LENGTH);
String subStr2 = longStr2.substring(0, ResourceIndexedSearchParamString.MAX_LENGTH);
Set<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("type", new TokenParam(subStr1, subStr2)), null);
List<ResourcePersistentId> val = myOrganizationDao.searchForIds(new SearchParameterMap("type", new TokenParam(subStr1, subStr2)), null);
int initial = val.size();
myOrganizationDao.create(org, mySrd);

View File

@ -724,7 +724,7 @@ public class FhirResourceDaoR4UpdateTest extends BaseJpaR4Test {
p2.addName().setFamily("Tester").addGiven("testUpdateMaintainsSearchParamsDstu2BBB");
myPatientDao.create(p2, mySrd).getId();
Set<ResourcePersistentId> ids = myPatientDao.searchForIds(new SearchParameterMap(Patient.SP_GIVEN, new StringParam("testUpdateMaintainsSearchParamsDstu2AAA")), null);
List<ResourcePersistentId> ids = myPatientDao.searchForIds(new SearchParameterMap(Patient.SP_GIVEN, new StringParam("testUpdateMaintainsSearchParamsDstu2AAA")), null);
assertEquals(1, ids.size());
assertThat(ResourcePersistentId.toLongList(ids), contains(p1id.getIdPartAsLong()));

View File

@ -1,54 +1,26 @@
package ca.uhn.fhir.jpa.delete.job;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.jpa.batch.CommonBatchJobConfig;
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.batch.job.MultiUrlJobParameterUtil;
import ca.uhn.fhir.jpa.batch.reader.CronologicalBatchAllResourcePidReader;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.TemporalPrecisionEnum;
import ca.uhn.fhir.model.primitive.DateTimeDt;
import ca.uhn.fhir.test.utilities.BatchJobHelper;
import org.apache.commons.lang3.time.DateUtils;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Observation;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ReindexJobTest extends BaseJpaR4Test {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexJobTest.class);
@Autowired
private IBatchJobSubmitter myBatchJobSubmitter;
@Autowired
@Qualifier(BatchConstants.REINDEX_JOB_NAME)
private Job myReindexJob;
@Autowired
@Qualifier(BatchConstants.REINDEX_EVERYTHING_JOB_NAME)
private Job myReindexEverythingJob;
@Autowired
private BatchJobHelper myBatchJobHelper;
private IJobCoordinator myJobCoordinator;
private ReindexTestHelper myReindexTestHelper;
@ -58,128 +30,61 @@ public class ReindexJobTest extends BaseJpaR4Test {
}
@Test
public void testReindexJob() throws Exception {
public void testReindex_ByUrl() {
// setup
IIdType obsFinalId = myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);
IIdType obsCancelledId = myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.CANCELLED);
myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.CANCELLED);
myReindexTestHelper.createAlleleSearchParameter();
assertEquals(2, myObservationDao.search(SearchParameterMap.newSynchronous()).size());
// The searchparam value is on the observation, but it hasn't been indexed yet
// The search param value is on the observation, but it hasn't been indexed yet
assertThat(myReindexTestHelper.getAlleleObservationIds(), hasSize(0));
// Only reindex one of them
JobParameters jobParameters = MultiUrlJobParameterUtil.buildJobParameters("Observation?status=final");
ReindexJobParameters parameters = new ReindexJobParameters();
parameters.addUrl("Observation?status=final");
// execute
JobExecution jobExecution = myBatchJobSubmitter.runJob(myReindexJob, jobParameters);
myBatchJobHelper.awaitJobCompletion(jobExecution);
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(parameters);
String id = myJobCoordinator.startInstance(startRequest);
myBatch2JobHelper.awaitJobCompletion(id);
// validate
assertEquals(2, myObservationDao.search(SearchParameterMap.newSynchronous()).size());
// Now one of them should be indexed
List<String> alleleObservationIds = myReindexTestHelper.getAlleleObservationIds();
assertThat(alleleObservationIds, hasSize(1));
assertEquals(obsFinalId.getIdPart(), alleleObservationIds.get(0));
}
private long generateAndReturnTimeGap() {
long start_time = System.currentTimeMillis();
sleepUntilTimeChanges();
long end_time = System.currentTimeMillis();
return end_time - start_time;
}
@Test
public void testReindexJobLastUpdatedFilter() throws Exception {
// Given
DaoMethodOutcome T1_Patient = myReindexTestHelper.createEyeColourPatient(true);
long timeGap1 = generateAndReturnTimeGap();
DaoMethodOutcome T3_Patient = myReindexTestHelper.createEyeColourPatient(true);
long timeGap2 = generateAndReturnTimeGap();
DaoMethodOutcome T6_Patient = myReindexTestHelper.createEyeColourPatient(true);
// Setup cutoff
Date firstPatientLastUpdated = T1_Patient.getResource().getMeta().getLastUpdated();
Date secondPatientLastUpdated = T3_Patient.getResource().getMeta().getLastUpdated();
Date T2_Date = DateUtils.addMilliseconds(firstPatientLastUpdated, (int) (timeGap1 / 2));
Date T4_Date = DateUtils.addMilliseconds(secondPatientLastUpdated, (int) (timeGap2 / 2));
ourLog.info("Older lastUpdated: {}", firstPatientLastUpdated);
ourLog.info("Newer lastUpdated: {}", secondPatientLastUpdated);
ourLog.info("Cutoff Lowerbound: {}", T2_Date);
ourLog.info("Cutoff Upperbound: {}", T4_Date);
assertTrue(T2_Date.after(firstPatientLastUpdated));
assertTrue(T2_Date.before(secondPatientLastUpdated));
assertTrue(T4_Date.after(secondPatientLastUpdated));
//Create our new SP.
myReindexTestHelper.createEyeColourSearchParameter();
//There exists 3 patients
assertEquals(3, myPatientDao.search(SearchParameterMap.newSynchronous()).size());
// The searchparam value is on the patient, but it hasn't been indexed yet, so the call to search for all with eye-colour returns 0
assertThat(myReindexTestHelper.getEyeColourPatientIds(), hasSize(0));
// Only reindex one of them
String T2_DateString = new DateTimeDt(T2_Date).setPrecision(TemporalPrecisionEnum.MILLI).getValueAsString();
String T4_DateString = new DateTimeDt(T4_Date).setPrecision(TemporalPrecisionEnum.MILLI).getValueAsString();
JobParameters T3_Patient_JobParams = MultiUrlJobParameterUtil.buildJobParameters("Patient?_lastUpdated=ge" +
T2_DateString + "&_lastUpdated=le" + T4_DateString);
JobParameters T1_Patient_JobParams = MultiUrlJobParameterUtil.buildJobParameters("Patient?_lastUpdated=le" + T2_DateString);
JobParameters T6_Patient_JobParams = MultiUrlJobParameterUtil.buildJobParameters("Patient?_lastUpdated=ge" + T4_DateString);
// execute
JobExecution jobExecution = myBatchJobSubmitter.runJob(myReindexJob, T3_Patient_JobParams);
myBatchJobHelper.awaitJobCompletion(jobExecution);
// Now one of them should be indexed for the eye colour SP
List<String> eyeColourPatientIds = myReindexTestHelper.getEyeColourPatientIds();
assertThat(eyeColourPatientIds, hasSize(1));
assertEquals(T3_Patient.getId().getIdPart(), eyeColourPatientIds.get(0));
// execute
JobExecution jobExecutionT1 = myBatchJobSubmitter.runJob(myReindexJob, T1_Patient_JobParams);
myBatchJobHelper.awaitJobCompletion(jobExecution);
// Now one of them should be indexed for the eye colour SP
eyeColourPatientIds = myReindexTestHelper.getEyeColourPatientIds();
assertThat(eyeColourPatientIds, hasSize(2));
assertThat(eyeColourPatientIds, hasItem(T3_Patient.getId().getIdPart()));
// execute
JobExecution jobExecutionT6 = myBatchJobSubmitter.runJob(myReindexJob, T6_Patient_JobParams);
myBatchJobHelper.awaitJobCompletion(jobExecution);
// Now one of them should be indexed for the eye colour SP
eyeColourPatientIds = myReindexTestHelper.getEyeColourPatientIds();
assertThat(eyeColourPatientIds, hasSize(3));
assertThat(eyeColourPatientIds, hasItem(T6_Patient.getId().getIdPart()));
}
@Test
public void testReindexEverythingJob() throws Exception {
public void testReindex_Everything() {
// setup
for (int i = 0; i < 50; ++i) {
myReindexTestHelper.createObservationWithAlleleExtension(Observation.ObservationStatus.FINAL);
}
sleepUntilTimeChanges();
myReindexTestHelper.createAlleleSearchParameter();
mySearchParamRegistry.forceRefresh();
assertEquals(50, myObservationDao.search(SearchParameterMap.newSynchronous()).size());
// The searchparam value is on the observation, but it hasn't been indexed yet
// The search param value is on the observation, but it hasn't been indexed yet
assertThat(myReindexTestHelper.getAlleleObservationIds(), hasSize(0));
JobParameters jobParameters = buildEverythingJobParameters(3L);
// execute
JobExecution jobExecution = myBatchJobSubmitter.runJob(myReindexEverythingJob, jobParameters);
myBatchJobHelper.awaitJobCompletion(jobExecution);
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
startRequest.setParameters(new ReindexJobParameters());
String id = myJobCoordinator.startInstance(startRequest);
myBatch2JobHelper.awaitJobCompletion(id);
// validate
assertEquals(50, myObservationDao.search(SearchParameterMap.newSynchronous()).size());
@ -187,13 +92,4 @@ public class ReindexJobTest extends BaseJpaR4Test {
assertThat(myReindexTestHelper.getAlleleObservationIds(), hasSize(50));
}
private JobParameters buildEverythingJobParameters(Long theBatchSize) {
Map<String, JobParameter> map = new HashMap<>();
map.put(CronologicalBatchAllResourcePidReader.JOB_PARAM_START_TIME, new JobParameter(DateUtils.addMinutes(new Date(), CommonBatchJobConfig.MINUTES_IN_FUTURE_TO_PROCESS_FROM)));
map.put(CronologicalBatchAllResourcePidReader.JOB_PARAM_BATCH_SIZE, new JobParameter(theBatchSize.longValue()));
JobParameters parameters = new JobParameters(map);
return parameters;
}
}

View File

@ -25,7 +25,7 @@ import ca.uhn.fhir.rest.server.RestfulServer;
import ca.uhn.fhir.rest.server.interceptor.CorsInterceptor;
import ca.uhn.fhir.rest.server.interceptor.ResponseHighlighterInterceptor;
import ca.uhn.fhir.rest.server.provider.DeleteExpungeProvider;
import ca.uhn.fhir.rest.server.provider.ReindexProvider;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexProvider;
import ca.uhn.fhir.test.utilities.JettyUtil;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;

View File

@ -21,6 +21,7 @@ import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.DecimalType;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.StringType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -128,6 +129,9 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
myTenantClientInterceptor.setTenantId(TENANT_B);
IIdType obsFinalB = doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension());
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
IIdType obsFinalD = doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension());
reindexTestHelper.createAlleleSearchParameter();
// The searchparam value is on the observation, but it hasn't been indexed yet
@ -135,13 +139,11 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
assertThat(reindexTestHelper.getAlleleObservationIds(myClient), hasSize(0));
myTenantClientInterceptor.setTenantId(TENANT_B);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient), hasSize(0));
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient), hasSize(0));
// setup
Parameters input = new Parameters();
Integer batchSize = 2401;
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_BATCH_SIZE, new DecimalType(batchSize));
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_EVERYTHING, new BooleanType(true));
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
// reindex all of Tenant A
myTenantClientInterceptor.setTenantId(TENANT_A);
@ -152,9 +154,9 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
.withParameters(input)
.execute();
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
DecimalType jobId = (DecimalType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
StringType jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
myBatchJobHelper.awaitJobExecution(jobId.getValueAsNumber().longValue());
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
// validate
List<String> alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);
@ -164,6 +166,25 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
assertEquals(obsFinalA.getIdPart(), alleleObservationIds.get(0));
myTenantClientInterceptor.setTenantId(TENANT_B);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient), hasSize(0));
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient), hasSize(0));
// Reindex default partition
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
response = myClient
.operation()
.onServer()
.named(ProviderConstants.OPERATION_REINDEX)
.withParameters(input)
.execute();
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient), hasSize(1));
}
@Test
@ -187,8 +208,6 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
// setup
Parameters input = new Parameters();
Integer batchSize = 2401;
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_BATCH_SIZE, new DecimalType(batchSize));
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_URL, "Observation?status=final");
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
@ -202,9 +221,9 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
.withParameters(input)
.execute();
ourLog.info(myFhirContext.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
DecimalType jobId = (DecimalType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
StringType jobId = (StringType) response.getParameter(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
myBatchJobHelper.awaitJobExecution(jobId.getValueAsNumber().longValue());
myBatch2JobHelper.awaitJobCompletion(jobId.getValue());
// validate
List<String> alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);

View File

@ -0,0 +1,153 @@
package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexChunkIds;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import org.hl7.fhir.r4.model.IdType;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXED;
import static ca.uhn.fhir.jpa.dao.BaseHapiFhirDao.INDEX_STATUS_INDEXING_FAILED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class ReindexStepTest extends BaseJpaR4Test {
@Autowired
private ReindexStep myReindexStep;
@Mock
private IJobDataSink<VoidModel> myDataSink;
@Captor
private ArgumentCaptor<String> myErrorCaptor;
@Test
public void testReindex_NoActionNeeded() {
// Setup
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
// Verify
assertEquals(2, outcome.getRecordsProcessed());
assertEquals(4, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
}
@Test
public void testReindex_IndexesWereMissing() {
// Setup
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
runInTransaction(() -> {
myResourceIndexedSearchParamStringDao.deleteByResourceId(id0);
myResourceIndexedSearchParamTokenDao.deleteByResourceId(id0);
});
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
// Verify
assertEquals(2, outcome.getRecordsProcessed());
assertEquals(4, myCaptureQueriesListener.logSelectQueries().size());
// name, family, phonetic, deceased, active
assertEquals(5, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
}
@Test
public void testReindex_OneResourceReindexFailedButOthersSucceeded() {
// Setup
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
Long idPatientToInvalidate = createPatient().getIdPartAsLong();
Long idObservation = createObservation(withSubject(new IdType("Patient/" + idPatientToInvalidate))).getIdPartAsLong();
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(idPatientToInvalidate.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Observation").setId(idObservation.toString()));
runInTransaction(() -> {
// Swap in some invalid text, which will cause an error when we go to reindex
assertEquals(1, myEntityManager.createNativeQuery("UPDATE HFJ_RES_VER SET RES_TEXT = null WHERE RES_ID = " + idPatientToInvalidate).executeUpdate());
assertEquals(1, myEntityManager.createNativeQuery("UPDATE HFJ_RES_VER SET RES_TEXT_VC = 'ABCDEFG' WHERE RES_ID = " + idPatientToInvalidate).executeUpdate());
// Also set the current index status to errored on one, so it can be reset
assertEquals(1, myEntityManager.createNativeQuery("UPDATE HFJ_RESOURCE SET SP_INDEX_STATUS = 2 WHERE RES_ID = " + id0).executeUpdate());
myResourceIndexedSearchParamStringDao.deleteByResourceId(id0);
myResourceIndexedSearchParamTokenDao.deleteByResourceId(id0);
});
// Execute
myCaptureQueriesListener.clear();
RunOutcome outcome = myReindexStep.doReindex(data, myDataSink, "index-id", "chunk-id");
// Verify
assertEquals(4, outcome.getRecordsProcessed());
assertEquals(6, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(5, myCaptureQueriesListener.countInsertQueries());
assertEquals(2, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
verify(myDataSink, times(1)).recoveredError(myErrorCaptor.capture());
String message = myErrorCaptor.getValue();
message = message.replace("Observation.subject.where(resolve() is Patient)", "Observation.subject"); // depending on whether subject or patient gets indexed first
assertEquals("Failure reindexing Patient/" + idPatientToInvalidate + ": HAPI-0928: Failed to parse database resource[Patient/" + idPatientToInvalidate + " (pid " + idPatientToInvalidate + ", version R4): HAPI-1861: Failed to parse JSON encoded FHIR content: HAPI-1859: Content does not appear to be FHIR JSON, first non-whitespace character was: 'A' (must be '{')", message);
runInTransaction(() -> {
ResourceTable table = myResourceTableDao.findById(idPatientToInvalidate).orElseThrow();
assertEquals(INDEX_STATUS_INDEXING_FAILED, table.getIndexStatus());
table = myResourceTableDao.findById(id0).orElseThrow();
assertEquals(INDEX_STATUS_INDEXED, table.getIndexStatus());
});
}
}

View File

@ -0,0 +1,151 @@
package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import ca.uhn.fhir.jpa.dao.r4.BaseJpaR4Test;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@SuppressWarnings("unchecked")
@TestMethodOrder(value = MethodOrderer.MethodName.class)
public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
@Autowired
private IResourceReindexSvc mySvc;
@Test
public void testFetchResourceIdsPage_NoUrl_WithData() {
// Setup
createPatient(withActiveFalse());
sleepUntilTimeChanges();
Date start = new Date();
Long id0 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
Long id1 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
Date beforeLastInRange = new Date();
sleepUntilTimeChanges();
Long id2 = createObservation(withObservationCode("http://foo", "bar")).getIdPartAsLong();
sleepUntilTimeChanges();
Date end = new Date();
sleepUntilTimeChanges();
createPatient(withActiveFalse());
// Execute
myCaptureQueriesListener.clear();
IResourceReindexSvc.IdChunk page = mySvc.fetchResourceIdsPage(start, end, null, null);
// Verify
assertThat(page.getResourceTypes().toString(), page.getResourceTypes(), contains("Patient", "Patient", "Observation"));
assertThat(page.getIds(), contains(new ResourcePersistentId(id0), new ResourcePersistentId(id1), new ResourcePersistentId(id2)));
assertTrue(page.getLastDate().after(beforeLastInRange));
assertTrue(page.getLastDate().before(end));
assertEquals(1, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
}
@Test
public void testFetchResourceIdsPage_NoUrl_NoData() {
// Setup
Date start = new Date();
Date end = new Date();
// Execute
myCaptureQueriesListener.clear();
IResourceReindexSvc.IdChunk page = mySvc.fetchResourceIdsPage(start, end, null, null);
// Verify
assertEquals(0, page.getResourceTypes().size());
assertEquals(0, page.getIds().size());
assertNull(page.getLastDate());
assertEquals(1, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
}
@Test
public void testFetchResourceIdsPage_WithUrl_WithData() {
// Setup
createPatient(withActiveFalse());
sleepUntilTimeChanges();
// Start of resources within range
Date start = new Date();
sleepUntilTimeChanges();
Long id0 = createPatient(withActiveFalse()).getIdPartAsLong();
createObservation(withObservationCode("http://foo", "bar"));
createObservation(withObservationCode("http://foo", "bar"));
sleepUntilTimeChanges();
Date beforeLastInRange = new Date();
sleepUntilTimeChanges();
Long id1 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChanges();
Date end = new Date();
sleepUntilTimeChanges();
// End of resources within range
createObservation(withObservationCode("http://foo", "bar"));
createPatient(withActiveFalse());
sleepUntilTimeChanges();
// Execute
myCaptureQueriesListener.clear();
IResourceReindexSvc.IdChunk page = mySvc.fetchResourceIdsPage(start, end, null, "Patient?active=false");
// Verify
assertThat(page.getResourceTypes().toString(), page.getResourceTypes(), contains("Patient", "Patient"));
assertThat(page.getIds(), contains(new ResourcePersistentId(id0), new ResourcePersistentId(id1)));
assertTrue(page.getLastDate().after(beforeLastInRange));
assertTrue(page.getLastDate().before(end));
assertEquals(3, myCaptureQueriesListener.logSelectQueries().size());
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
}
}

View File

@ -1,51 +0,0 @@
package ca.uhn.fhir.jpa.reindex.job;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.search.reindex.ResourceReindexer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class ReindexWriterTest {
@Mock
private DaoConfig myDaoConfig;
@Mock
private PlatformTransactionManager myPlatformTransactionManager;
@Mock
ResourceReindexer myResourceReindexer;
@InjectMocks
private ReindexWriter myReindexWriter;
@Test
public void testReindexSplitsPidList() throws Exception {
when(myDaoConfig.getReindexBatchSize()).thenReturn(5);
when(myDaoConfig.getReindexThreadCount()).thenReturn(4);
List<Long> pidList = new ArrayList<>();
int count = 20;
for (long i = 0; i < count; ++i) {
pidList.add(i);
}
List<List<Long>> pidListList = new ArrayList<>();
pidListList.add(pidList);
myReindexWriter.write(pidListList);
verify(myResourceReindexer, times(count)).readAndReindexResourceByPid(anyLong());
verifyNoMoreInteractions(myResourceReindexer);
}
}

View File

@ -0,0 +1,28 @@
package ca.uhn.fhir.jpa.util;
import ca.uhn.fhir.batch2.api.IJobCleanerService;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.model.StatusEnum;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.equalTo;
public class Batch2JobHelper {
@Autowired
private IJobCleanerService myJobCleanerService;
@Autowired
private IJobCoordinator myJobCoordinator;
public void awaitJobCompletion(String theId) {
await().until(() -> {
myJobCleanerService.runCleanupPass();
return myJobCoordinator.getInstance(theId).getStatus();
}, equalTo(StatusEnum.COMPLETED));
}
}

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -73,9 +73,9 @@ import java.util.stream.Collectors;
@Indexed(routingBinder= @RoutingBinderRef(type = ResourceTableRoutingBinder.class))
@Entity
@Table(name = "HFJ_RESOURCE", uniqueConstraints = {}, indexes = {
// Do not reuse previously used index name: IDX_INDEXSTATUS
@Index(name = "IDX_RES_DATE", columnList = "RES_UPDATED"),
@Index(name = "IDX_RES_TYPE", columnList = "RES_TYPE"),
@Index(name = "IDX_INDEXSTATUS", columnList = "SP_INDEX_STATUS")
})
@NamedEntityGraph(name = "Resource.noJoins")
public class ResourceTable extends BaseHasResource implements Serializable, IBasePersistedResource, IResourceLookup {

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -195,14 +195,6 @@
</webApp>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself. -->
<plugin>

View File

@ -1,5 +1,6 @@
package ca.uhn.fhirtest;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexProvider;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.support.IValidationSupport;
@ -268,6 +269,12 @@ public class TestRestfulServer extends RestfulServer {
*/
registerProvider(myAppCtx.getBean(BulkDataExportProvider.class));
/*
* $reindex
*/
registerProvider(myAppCtx.getBean(ReindexProvider.class));
/*
* $diff operation
*/

View File

@ -1,6 +1,8 @@
package ca.uhn.fhirtest.config;
import ca.uhn.fhir.batch2.jobs.config.Batch2JobsConfig;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.jpa.batch2.JpaBatch2Config;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.match.config.SubscriptionProcessorConfig;
@ -19,7 +21,9 @@ import org.springframework.context.annotation.Import;
WebsocketDispatcherConfig.class,
SubscriptionChannelConfig.class,
SubscriptionProcessorConfig.class,
SubscriptionSubmitterConfig.class
SubscriptionSubmitterConfig.class,
JpaBatch2Config.class,
Batch2JobsConfig.class
})
public class CommonConfig {

View File

@ -43,9 +43,7 @@ import java.util.concurrent.TimeUnit;
@Import({CommonConfig.class, JpaDstu3Config.class, HapiJpaConfig.class})
@EnableTransactionManagement()
public class TestDstu3Config {
public static final String FHIR_DB_USERNAME = "${fhir.db.username}";
public static final String FHIR_DB_PASSWORD = "${fhir.db.password}";
public static final String FHIR_LUCENE_LOCATION_DSTU3 = "${fhir.lucene.location.dstu3}";
public static final String FHIR_LUCENE_LOCATION_DSTU3 = "fhir.lucene.location.dstu3";
private String myDbUsername = System.getProperty(TestR5Config.FHIR_DB_USERNAME);
private String myDbPassword = System.getProperty(TestR5Config.FHIR_DB_PASSWORD);

View File

@ -43,9 +43,7 @@ import java.util.concurrent.TimeUnit;
@Import({CommonConfig.class, JpaR4Config.class, HapiJpaConfig.class})
@EnableTransactionManagement()
public class TestR4Config {
public static final String FHIR_DB_USERNAME = "${fhir.db.username}";
public static final String FHIR_DB_PASSWORD = "${fhir.db.password}";
public static final String FHIR_LUCENE_LOCATION_R4 = "${fhir.lucene.location.r4}";
public static final String FHIR_LUCENE_LOCATION_R4 = "fhir.lucene.location.r4";
public static final Integer COUNT_SEARCH_RESULTS_UP_TO = 50000;
private String myDbUsername = System.getProperty(TestR5Config.FHIR_DB_USERNAME);

View File

@ -45,9 +45,9 @@ import java.util.concurrent.TimeUnit;
@Import({CommonConfig.class, JpaR5Config.class, HapiJpaConfig.class})
@EnableTransactionManagement()
public class TestR5Config {
public static final String FHIR_DB_USERNAME = "${fhir.db.username}";
public static final String FHIR_DB_PASSWORD = "${fhir.db.password}";
public static final String FHIR_LUCENE_LOCATION_R5 = "${fhir.lucene.location.r5}";
public static final String FHIR_DB_USERNAME = "fhir.db.username";
public static final String FHIR_DB_PASSWORD = "fhir.db.password";
public static final String FHIR_LUCENE_LOCATION_R5 = "fhir.lucene.location.r5";
public static final Integer COUNT_SEARCH_RESULTS_UP_TO = 50000;
private static final Logger ourLog = LoggerFactory.getLogger(TestR5Config.class);
private String myDbUsername = System.getProperty(TestR5Config.FHIR_DB_USERNAME);

View File

@ -12,6 +12,8 @@ import org.hl7.fhir.dstu3.model.Subscription.SubscriptionChannelType;
import org.hl7.fhir.dstu3.model.Subscription.SubscriptionStatus;
import org.hl7.fhir.instance.model.api.IIdType;
import static ca.uhn.fhirtest.config.TestDstu3Config.FHIR_LUCENE_LOCATION_DSTU3;
public class UhnFhirTestApp {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(UhnFhirTestApp.class);

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -33,7 +33,6 @@ import java.util.Optional;
* a Long, a String, or something else.
*/
public class ResourcePersistentId {
private static final String RESOURCE_PID = "RESOURCE_PID";
private Object myId;
private Long myVersion;
private IIdType myAssociatedResourceId;
@ -91,6 +90,9 @@ public class ResourcePersistentId {
}
public Long getIdAsLong() {
if (myId instanceof String) {
return Long.parseLong((String) myId);
}
return (Long) myId;
}

View File

@ -157,7 +157,7 @@ public class ProviderConstants {
public static final String OPERATION_BATCH_RESPONSE_JOB_ID = "jobId";
/**
* Operation name for the $delete-expunge operation
* Operation name for the $reindex operation
*/
public static final String OPERATION_REINDEX = "$reindex";

View File

@ -1,86 +0,0 @@
package ca.uhn.fhir.rest.server.provider;
/*-
* #%L
* HAPI FHIR - Server Framework
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.annotation.IdParam;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IReindexJobSubmitter;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.util.ParametersUtil;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.util.List;
import java.util.stream.Collectors;
public class ReindexProvider {
private final FhirContext myFhirContext;
private final IReindexJobSubmitter myReindexJobSubmitter;
private final MultiUrlProcessor myMultiUrlProcessor;
public ReindexProvider(FhirContext theFhirContext, IReindexJobSubmitter theReindexJobSubmitter) {
myFhirContext = theFhirContext;
myMultiUrlProcessor = new MultiUrlProcessor(theFhirContext, theReindexJobSubmitter);
myReindexJobSubmitter = theReindexJobSubmitter;
}
@Operation(name = ProviderConstants.OPERATION_REINDEX, idempotent = false)
public IBaseParameters Reindex(
@OperationParam(name = ProviderConstants.OPERATION_REINDEX_PARAM_URL, typeName = "string", min = 0, max = 1) List<IPrimitiveType<String>> theUrlsToReindex,
@OperationParam(name = ProviderConstants.OPERATION_REINDEX_PARAM_BATCH_SIZE, typeName = "decimal", min = 0, max = 1) IPrimitiveType<BigDecimal> theBatchSize,
@OperationParam(name = ProviderConstants.OPERATION_REINDEX_PARAM_EVERYTHING, typeName = "boolean", min = 0, max = 1) IPrimitiveType<Boolean> theEverything,
RequestDetails theRequestDetails
) {
boolean everything = theEverything != null && theEverything.getValue();
@Nullable Integer batchSize = myMultiUrlProcessor.getBatchSize(theBatchSize);
if (everything) {
return processEverything(batchSize, theRequestDetails);
} else if (theUrlsToReindex != null && !theUrlsToReindex.isEmpty()) {
List<String> urls = theUrlsToReindex.stream().map(IPrimitiveType::getValue).collect(Collectors.toList());
return myMultiUrlProcessor.processUrls(urls, batchSize, theRequestDetails);
} else {
throw new InvalidRequestException(Msg.code(318) + ProviderConstants.OPERATION_REINDEX + " must specify either everything=true or provide at least one value for " + ProviderConstants.OPERATION_REINDEX_PARAM_URL);
}
}
private IBaseParameters processEverything(Integer theBatchSize, RequestDetails theRequestDetails) {
try {
JobExecution jobExecution = myReindexJobSubmitter.submitEverythingJob(theBatchSize, theRequestDetails);
IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
ParametersUtil.addParameterToParametersLong(myFhirContext, retVal, ProviderConstants.OPERATION_BATCH_RESPONSE_JOB_ID, jobExecution.getJobId());
return retVal;
} catch (JobParametersInvalidException e) {
throw new InvalidRequestException(Msg.code(319) + "Invalid job parameters: " + e.getMessage(), e);
}
}
}

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
@ -123,16 +123,6 @@
</executions>
</plugin>
<!-- Tell Maven which Java source version you want to use -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<!-- The configuration here tells the WAR plugin to include the FHIR Tester overlay. You can omit it if you are not using that feature. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.0.0-PRE6-SNAPSHOT</version>
<version>6.0.0-PRE8-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -20,14 +20,16 @@ package ca.uhn.fhir.batch2.jobs.config;
* #L%
*/
import ca.uhn.fhir.batch2.jobs.imprt.BulkImport2AppCtx;
import ca.uhn.fhir.batch2.jobs.imprt.BulkImportAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
//When you define a new batch job, add it here.
@Configuration
@Import({
BulkImport2AppCtx.class
BulkImportAppCtx.class,
ReindexAppCtx.class
})
public class Batch2JobsConfig {
// nothing

View File

@ -159,7 +159,7 @@ public class BulkDataImportProvider {
}
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(BulkImport2AppCtx.JOB_BULK_IMPORT_PULL);
request.setJobDefinitionId(BulkImportAppCtx.JOB_BULK_IMPORT_PULL);
request.setParameters(jobParameters);
ourLog.info("Requesting Bulk Import Job ($import by Manifest) with {} urls", typeAndUrls.size());

View File

@ -27,7 +27,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BulkImport2AppCtx {
public class BulkImportAppCtx {
public static final String JOB_BULK_IMPORT_PULL = "BULK_IMPORT_PULL";
public static final int PARAM_MAXIMUM_BATCH_SIZE_DEFAULT = 800; // Avoid the 1000 SQL param limit
@ -48,7 +48,6 @@ public class BulkImport2AppCtx {
.addLastStep(
"process-files",
"Process files",
NdJsonFileJson.class,
bulkImport2ConsumeFiles())
.build();
}

View File

@ -32,6 +32,10 @@ import javax.validation.constraints.Size;
import java.util.ArrayList;
import java.util.List;
/**
* This class is the parameters model object for starting a
* bulk import job.
*/
public class BulkImportJobParameters implements IModelJson {
@JsonProperty(value = "ndJsonUrls", required = true)

View File

@ -23,6 +23,7 @@ package ca.uhn.fhir.batch2.jobs.imprt;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.ILastJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.context.FhirContext;
@ -48,6 +49,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
@ -70,8 +72,9 @@ public class ConsumeFilesStep implements ILastJobStepWorker<BulkImportJobParamet
@Autowired
private IFhirSystemDao<?, ?> mySystemDao;
@Nonnull
@Override
public RunOutcome run(StepExecutionDetails<BulkImportJobParameters, NdJsonFileJson> theStepExecutionDetails, IJobDataSink<VoidModel> theDataSink) {
public RunOutcome run(@Nonnull StepExecutionDetails<BulkImportJobParameters, NdJsonFileJson> theStepExecutionDetails, @Nonnull IJobDataSink<VoidModel> theDataSink) {
String ndjson = theStepExecutionDetails.getData().getNdJsonText();
String sourceName = theStepExecutionDetails.getData().getSourceName();

View File

@ -22,8 +22,8 @@ package ca.uhn.fhir.batch2.jobs.imprt;
import ca.uhn.fhir.batch2.api.IFirstJobStepWorker;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.i18n.Msg;
@ -42,6 +42,7 @@ import org.apache.http.impl.client.HttpClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@ -53,14 +54,15 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class FetchFilesStep implements IFirstJobStepWorker<BulkImportJobParameters, NdJsonFileJson> {
private static final Logger ourLog = LoggerFactory.getLogger(FetchFilesStep.class);
@Nonnull
@Override
public RunOutcome run(StepExecutionDetails<BulkImportJobParameters, VoidModel> theStepExecutionDetails, IJobDataSink<NdJsonFileJson> theDataSink) {
public RunOutcome run(@Nonnull StepExecutionDetails<BulkImportJobParameters, VoidModel> theStepExecutionDetails, @Nonnull IJobDataSink<NdJsonFileJson> theDataSink) {
Integer maxBatchResourceCount = theStepExecutionDetails
.getParameters()
.getMaxBatchResourceCount();
if (maxBatchResourceCount == null || maxBatchResourceCount <= 0) {
maxBatchResourceCount = BulkImport2AppCtx.PARAM_MAXIMUM_BATCH_SIZE_DEFAULT;
maxBatchResourceCount = BulkImportAppCtx.PARAM_MAXIMUM_BATCH_SIZE_DEFAULT;
}
try (CloseableHttpClient httpClient = newHttpClient(theStepExecutionDetails)) {

View File

@ -0,0 +1,72 @@
package ca.uhn.fhir.batch2.jobs.reindex;
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.api.IFirstJobStepWorker;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import org.hl7.fhir.r4.model.InstantType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.Date;
public class GenerateRangeChunksStep implements IFirstJobStepWorker<ReindexJobParameters, ReindexChunkRange> {
private static final Logger ourLog = LoggerFactory.getLogger(GenerateRangeChunksStep.class);
@Autowired
private IResourceReindexSvc myResourceReindexSvc;
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, VoidModel> theStepExecutionDetails, @Nonnull IJobDataSink<ReindexChunkRange> theDataSink) throws JobExecutionFailedException {
ReindexJobParameters params = theStepExecutionDetails.getParameters();
Date start = new InstantType("2000-01-01T00:00:00Z").getValue();
Date end = new Date();
if (params.getUrl().isEmpty()) {
ourLog.info("Initiating reindex of All Resources from {} to {}", start, end);
ReindexChunkRange nextRange = new ReindexChunkRange();
nextRange.setStart(start);
nextRange.setEnd(end);
theDataSink.accept(nextRange);
} else {
for (String nextUrl : params.getUrl()) {
ourLog.info("Initiating reindex of [{}]] from {} to {}", nextUrl, start, end);
ReindexChunkRange nextRange = new ReindexChunkRange();
nextRange.setUrl(nextUrl);
nextRange.setStart(start);
nextRange.setEnd(end);
theDataSink.accept(nextRange);
}
}
return RunOutcome.SUCCESS;
}
}

View File

@ -0,0 +1,139 @@
package ca.uhn.fhir.batch2.jobs.reindex;
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class LoadIdsStep implements IJobStepWorker<ReindexJobParameters, ReindexChunkRange, ReindexChunkIds> {
private static final Logger ourLog = LoggerFactory.getLogger(LoadIdsStep.class);
@Autowired
private IResourceReindexSvc myResourceReindexSvc;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private MatchUrlService myMatchUrlService;
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, ReindexChunkRange> theStepExecutionDetails, @Nonnull IJobDataSink<ReindexChunkIds> theDataSink) throws JobExecutionFailedException {
ReindexChunkRange data = theStepExecutionDetails.getData();
Date start = data.getStart();
Date end = data.getEnd();
ourLog.info("Beginning scan for reindex IDs in range {} to {}", start, end);
Date nextStart = start;
RequestPartitionId requestPartitionId = theStepExecutionDetails.getParameters().getRequestPartitionId();
Set<ReindexChunkIds.Id> idBuffer = new HashSet<>();
long previousLastTime = 0L;
int totalIdsFound = 0;
int chunkCount = 0;
while (true) {
String url = theStepExecutionDetails.getData().getUrl();
ourLog.info("Fetching resource ID chunk for URL {} - Range {} - {}", url, nextStart, end);
IResourceReindexSvc.IdChunk nextChunk = myResourceReindexSvc.fetchResourceIdsPage(nextStart, end, requestPartitionId, url);
if (nextChunk.getIds().isEmpty()) {
ourLog.info("No data returned");
break;
}
ourLog.info("Found {} IDs from {} to {}", nextChunk.getIds().size(), nextStart, nextChunk.getLastDate());
// If we get the same last time twice in a row, we've clearly reached the end
if (nextChunk.getLastDate().getTime() == previousLastTime) {
ourLog.info("Matching final timestamp of {}, loading is completed", new Date(previousLastTime));
break;
}
previousLastTime = nextChunk.getLastDate().getTime();
for (int i = 0; i < nextChunk.getIds().size(); i++) {
ReindexChunkIds.Id nextId = new ReindexChunkIds.Id();
nextId.setResourceType(nextChunk.getResourceTypes().get(i));
nextId.setId(nextChunk.getIds().get(i).getId().toString());
idBuffer.add(nextId);
}
nextStart = nextChunk.getLastDate();
if (idBuffer.size() >= 1000) {
List<ReindexChunkIds.Id> submissionIds = new ArrayList<>();
for (Iterator<ReindexChunkIds.Id> iter = idBuffer.iterator(); iter.hasNext(); ) {
submissionIds.add(iter.next());
iter.remove();
if (submissionIds.size() >= 1000) {
break;
}
}
totalIdsFound += submissionIds.size();
chunkCount++;
submitWorkChunk(submissionIds, theDataSink);
}
}
totalIdsFound += idBuffer.size();
chunkCount++;
submitWorkChunk(idBuffer, theDataSink);
ourLog.info("Submitted {} chunks with {} resource IDs", chunkCount, totalIdsFound);
return RunOutcome.SUCCESS;
}
private void submitWorkChunk(Collection<ReindexChunkIds.Id> theIdBuffer, IJobDataSink<ReindexChunkIds> theDataSink) {
if (theIdBuffer.isEmpty()) {
return;
}
ourLog.info("Submitting work chunk with {} IDs", theIdBuffer.size());
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().addAll(theIdBuffer);
theDataSink.accept(data);
}
}

Some files were not shown because too many files have changed in this diff Show More