mdm clear move to batch2 (#3671)

* begin with failing test

* continue building out

* continue building out

* reworking chunk api

* replace ChunkId zippers with proper iterator

* progressing test

* yay test passes!

* switch to idchunk interface

* fixmes

* compiles

* fixme

* fixme

* rename classes

* rename json classes with json suffix

* work towards homogeneous chunks

* fixme

* fixme

* unit test

* fixme

* fixme

* pull out builder

* moar tests

* moar tests

* preserve order

* moar test

* moar test

* fixme

* fixme -> wip to start ci

* msg code

* msg code fix test

* fixme

* change provider implementation

* final WIPs

* IT failing

* IT passes

* java 11

* fix test

* fix test

* fix test

* changelog

* use batch size if provided

* last WIP

* fix test

* fix test

* fix test

* whitespace change to trigger new build

* fix test

* bumping hapi version

* bumping hapi version

* fix test

* review feedback

Co-authored-by: Ken Stevens <ken@Kens-MacBook-Pro.local>
Co-authored-by: Ken Stevens <ken@smilecdr.com>
This commit is contained in:
Ken Stevens 2022-06-25 20:26:44 -04:00 committed by GitHub
parent fe99c36d97
commit 49d298dba1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
151 changed files with 2277 additions and 571 deletions

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -25,7 +25,7 @@ public final class Msg {
/**
* IMPORTANT: Please update the following comment after you add a new code
* Last code value: 2099
* Last code value: 2100
*/
private Msg() {}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -3,14 +3,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,4 @@
---
type: change
issue: 3671
title: "Converted mdm clear batch job from Spring Batch to Batch2"

View File

@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.batch;
*/
import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.mdm.job.MdmClearJobConfig;
import ca.uhn.fhir.jpa.batch.mdm.batch.job.MdmClearJobConfig;
import ca.uhn.fhir.jpa.batch.svc.BatchJobSubmitterImpl;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.bulk.imprt.job.BulkImportJobConfig;
@ -51,6 +51,10 @@ import org.springframework.context.annotation.Import;
TermCodeSystemVersionDeleteJobConfig.class
// When you define a new batch job, add it here.
})
@Deprecated
/**
* @deprecated Use Batch2JobsConfig
*/
public class BatchJobsConfig {
@Bean
public IBatchJobSubmitter batchJobSubmitter() {

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.batch.mdm;
package ca.uhn.fhir.jpa.batch.mdm.batch;
/*-
* #%L
@ -29,7 +29,7 @@ import ca.uhn.fhir.jpa.batch.api.IBatchJobSubmitter;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.batch.job.PartitionedUrlValidator;
import ca.uhn.fhir.jpa.batch.job.model.RequestListJson;
import ca.uhn.fhir.jpa.batch.mdm.job.ReverseCronologicalBatchMdmLinkPidReader;
import ca.uhn.fhir.jpa.batch.mdm.batch.job.ReverseCronologicalBatchMdmLinkPidReader;
import ca.uhn.fhir.mdm.api.IMdmClearJobSubmitter;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.exceptions.ForbiddenOperationException;

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.batch.mdm.job;
package ca.uhn.fhir.jpa.batch.mdm.batch.job;
/*-
* #%L
@ -30,7 +30,6 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.jpa.bulk.export.job;
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.RuntimeSearchParam;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;

View File

@ -14,7 +14,7 @@ import ca.uhn.fhir.jpa.api.svc.ISearchCoordinatorSvc;
import ca.uhn.fhir.jpa.batch.BatchJobsConfig;
import ca.uhn.fhir.jpa.batch.config.BatchConstants;
import ca.uhn.fhir.jpa.batch.job.PartitionedUrlValidator;
import ca.uhn.fhir.jpa.batch.mdm.MdmClearJobSubmitterImpl;
import ca.uhn.fhir.jpa.batch.mdm.batch.MdmClearJobSubmitterImpl;
import ca.uhn.fhir.jpa.batch.reader.BatchResourceSearcher;
import ca.uhn.fhir.jpa.binary.interceptor.BinaryStorageInterceptor;
import ca.uhn.fhir.jpa.binary.provider.BinaryAccessProvider;

View File

@ -684,7 +684,7 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
TransactionDetails transactionDetails = new TransactionDetails();
List<ResourceTable> deletedResources = new ArrayList<>();
for (ResourcePersistentId pid : theResourceIds) {
ResourceTable entity = myEntityManager.find(ResourceTable.class, pid.getId());
ResourceTable entity = myEntityManager.find(ResourceTable.class, pid.getIdAsLong());
deletedResources.add(entity);
T resourceToDelete = toResource(myResourceType, entity, null, false);

View File

@ -43,15 +43,19 @@ public interface IMdmLinkDao extends JpaRepository<MdmLink, Long>, IHapiFhirJpaR
@Query("DELETE FROM MdmLink f WHERE (myGoldenResourcePid = :pid OR mySourcePid = :pid) AND myMatchResult <> :matchResult")
int deleteWithAnyReferenceToPidAndMatchResultNot(@Param("pid") Long thePid, @Param("matchResult") MdmMatchResultEnum theMatchResult);
@Modifying
@Query("DELETE FROM MdmLink f WHERE myGoldenResourcePid IN (:goldenPids) OR mySourcePid IN (:goldenPids)")
void deleteLinksWithAnyReferenceToPids(@Param("goldenPids") List<Long> theResourcePids);
@Query("SELECT ml2.myGoldenResourcePid as goldenPid, ml2.mySourcePid as sourcePid FROM MdmLink ml2 " +
"WHERE ml2.myMatchResult=:matchResult " +
"AND ml2.myGoldenResourcePid IN (" +
"SELECT ml.myGoldenResourcePid FROM MdmLink ml " +
"INNER JOIN ResourceLink hrl " +
"ON hrl.myTargetResourcePid=ml.mySourcePid " +
"AND hrl.mySourceResourcePid=:groupPid " +
"AND hrl.mySourcePath='Group.member.entity' " +
"AND hrl.myTargetResourceType='Patient'" +
"SELECT ml.myGoldenResourcePid FROM MdmLink ml " +
"INNER JOIN ResourceLink hrl " +
"ON hrl.myTargetResourcePid=ml.mySourcePid " +
"AND hrl.mySourceResourcePid=:groupPid " +
"AND hrl.mySourcePath='Group.member.entity' " +
"AND hrl.myTargetResourceType='Patient'" +
")")
List<MdmPidTuple> expandPidsFromGroupPidGivenMatchResult(@Param("groupPid") Long theGroupPid, @Param("matchResult") MdmMatchResultEnum theMdmMatchResultEnum);
@ -60,6 +64,7 @@ public interface IMdmLinkDao extends JpaRepository<MdmLink, Long>, IHapiFhirJpaR
interface MdmPidTuple {
Long getGoldenPid();
Long getSourcePid();
}

View File

@ -68,9 +68,9 @@ import ca.uhn.fhir.jpa.model.entity.ResourceTag;
import ca.uhn.fhir.jpa.model.entity.SearchParamPresentEntity;
import ca.uhn.fhir.jpa.model.entity.TagDefinition;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
import ca.uhn.fhir.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -25,6 +25,10 @@ import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.pid.EmptyResourcePidList;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.MixedResourcePidList;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
@ -43,7 +47,6 @@ import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@ -69,17 +72,16 @@ public class ResourceReindexSvcImpl implements IResourceReindexSvc {
@Override
@Transactional
public IdChunk fetchResourceIdsPage(Date theStart, Date theEnd, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) {
public IResourcePidList fetchResourceIdsPage(Date theStart, Date theEnd, @Nonnull Integer thePageSize, @Nullable RequestPartitionId theRequestPartitionId, @Nullable String theUrl) {
int pageSize = 20000;
if (theUrl == null) {
return fetchResourceIdsPageNoUrl(theStart, theEnd, pageSize, theRequestPartitionId);
return fetchResourceIdsPageNoUrl(theStart, theEnd, thePageSize, theRequestPartitionId);
} else {
return fetchResourceIdsPageWithUrl(theStart, theEnd, pageSize, theUrl, theRequestPartitionId);
return fetchResourceIdsPageWithUrl(theStart, theEnd, thePageSize, theUrl, theRequestPartitionId);
}
}
private IdChunk fetchResourceIdsPageWithUrl(Date theStart, Date theEnd, int thePageSize, String theUrl, RequestPartitionId theRequestPartitionId) {
private IResourcePidList fetchResourceIdsPageWithUrl(Date theStart, Date theEnd, int thePageSize, String theUrl, RequestPartitionId theRequestPartitionId) {
String resourceType = theUrl.substring(0, theUrl.indexOf('?'));
RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(resourceType);
@ -95,22 +97,16 @@ public class ResourceReindexSvcImpl implements IResourceReindexSvc {
request.setRequestPartitionId(theRequestPartitionId);
List<ResourcePersistentId> ids = dao.searchForIds(searchParamMap, request);
// just a list of the same size where every element is the same resource type
List<String> resourceTypes = ids
.stream()
.map(t -> resourceType)
.collect(Collectors.toList());
Date lastDate = null;
if (ids.size() > 0) {
lastDate = dao.readByPid(ids.get(ids.size() - 1)).getMeta().getLastUpdated();
}
return new IdChunk(ids, resourceTypes, lastDate);
return new HomogeneousResourcePidList(resourceType, ids, lastDate);
}
@Nonnull
private IdChunk fetchResourceIdsPageNoUrl(Date theStart, Date theEnd, int thePagesize, RequestPartitionId theRequestPartitionId) {
private IResourcePidList fetchResourceIdsPageNoUrl(Date theStart, Date theEnd, int thePagesize, RequestPartitionId theRequestPartitionId) {
Pageable page = Pageable.ofSize(thePagesize);
Slice<Object[]> slice;
if (theRequestPartitionId == null || theRequestPartitionId.isAllPartitions()) {
@ -123,7 +119,7 @@ public class ResourceReindexSvcImpl implements IResourceReindexSvc {
List<Object[]> content = slice.getContent();
if (content.isEmpty()) {
return new IdChunk(Collections.emptyList(), Collections.emptyList(), null);
return new EmptyResourcePidList();
}
List<ResourcePersistentId> ids = content
@ -138,6 +134,6 @@ public class ResourceReindexSvcImpl implements IResourceReindexSvc {
Date lastDate = (Date) content.get(content.size() - 1)[2];
return new IdChunk(ids, types, lastDate);
return new MixedResourcePidList(types, ids, lastDate);
}
}

View File

@ -20,8 +20,8 @@ package ca.uhn.fhir.jpa.search.elastic;
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.search.lastn.ElasticsearchRestClientFactory;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@ -29,12 +29,12 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.common.settings.Settings;
import org.hibernate.search.backend.elasticsearch.cfg.ElasticsearchBackendSettings;
import org.hibernate.search.backend.elasticsearch.cfg.ElasticsearchIndexSettings;
import org.hibernate.search.backend.elasticsearch.index.IndexStatus;
import org.hibernate.search.engine.cfg.BackendSettings;
import org.hibernate.search.mapper.orm.automaticindexing.session.AutomaticIndexingSynchronizationStrategyNames;
import org.hibernate.search.mapper.orm.cfg.HibernateOrmMapperSettings;
import org.hibernate.search.backend.elasticsearch.cfg.ElasticsearchBackendSettings;
import org.hibernate.search.backend.elasticsearch.cfg.ElasticsearchIndexSettings;
import org.hibernate.search.mapper.orm.schema.management.SchemaManagementStrategyName;
import org.slf4j.Logger;

View File

@ -140,7 +140,6 @@ import org.hl7.fhir.r4.model.codesystems.ConceptSubsumptionOutcome;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
@ -31,6 +31,11 @@
<artifactId>hapi-fhir-jpaserver-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-storage-mdm</artifactId>
<version>${project.version}</version>
</dependency>
<!-- test -->
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>

View File

@ -21,7 +21,7 @@ package ca.uhn.fhir.jpa.mdm.config;
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.batch.mdm.MdmBatchJobSubmitterFactoryImpl;
import ca.uhn.fhir.jpa.batch.mdm.batch.MdmBatchJobSubmitterFactoryImpl;
import ca.uhn.fhir.jpa.dao.mdm.MdmLinkDeleteSvc;
import ca.uhn.fhir.jpa.interceptor.MdmSearchExpandingInterceptor;
import ca.uhn.fhir.mdm.api.IMdmBatchJobSubmitterFactory;
@ -52,5 +52,4 @@ public class MdmCommonConfig {
MdmLinkDeleteSvc mdmLinkDeleteSvc() {
return new MdmLinkDeleteSvc();
}
}

View File

@ -22,6 +22,7 @@ package ca.uhn.fhir.jpa.mdm.config;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.svc.IGoldenResourceSearchSvc;
import ca.uhn.fhir.jpa.mdm.broker.MdmMessageHandler;
import ca.uhn.fhir.jpa.mdm.broker.MdmMessageKeySvc;
import ca.uhn.fhir.jpa.mdm.broker.MdmQueueConsumerLoader;
@ -30,6 +31,7 @@ import ca.uhn.fhir.jpa.mdm.dao.MdmLinkFactory;
import ca.uhn.fhir.jpa.mdm.interceptor.IMdmStorageInterceptor;
import ca.uhn.fhir.jpa.mdm.interceptor.MdmStorageInterceptor;
import ca.uhn.fhir.jpa.mdm.svc.GoldenResourceMergerSvcImpl;
import ca.uhn.fhir.jpa.mdm.svc.GoldenResourceSearchSvcImpl;
import ca.uhn.fhir.jpa.mdm.svc.IMdmModelConverterSvc;
import ca.uhn.fhir.jpa.mdm.svc.MdmControllerSvcImpl;
import ca.uhn.fhir.jpa.mdm.svc.MdmEidUpdateService;
@ -62,6 +64,7 @@ import ca.uhn.fhir.mdm.api.IMdmLinkUpdaterSvc;
import ca.uhn.fhir.mdm.api.IMdmMatchFinderSvc;
import ca.uhn.fhir.mdm.api.IMdmSettings;
import ca.uhn.fhir.mdm.api.IMdmSurvivorshipService;
import ca.uhn.fhir.mdm.batch2.MdmBatch2Config;
import ca.uhn.fhir.mdm.log.Logs;
import ca.uhn.fhir.mdm.provider.MdmControllerHelper;
import ca.uhn.fhir.mdm.provider.MdmProviderLoader;
@ -76,7 +79,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
@Import(MdmCommonConfig.class)
@Import({MdmCommonConfig.class, MdmBatch2Config.class})
public class MdmConsumerConfig {
private static final Logger ourLog = Logs.getMdmTroubleshootingLog();
@ -263,4 +266,9 @@ public class MdmConsumerConfig {
@Bean
MdmPartitionHelper mdmPartitionHelper() {return new MdmPartitionHelper();}
@Bean
public IGoldenResourceSearchSvc goldenResourceSearchSvc() {
return new GoldenResourceSearchSvcImpl();
}
}

View File

@ -34,6 +34,7 @@ import ca.uhn.fhir.mdm.log.Logs;
import ca.uhn.fhir.mdm.model.MdmTransactionContext;
import ca.uhn.fhir.rest.api.Constants;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
@ -386,4 +387,14 @@ public class MdmLinkDaoSvc {
myJpaIdHelperService.getPidOrNull(theGoldenResource),
myJpaIdHelperService.getPidOrNull(theSourceResource));
}
@Transactional(propagation = Propagation.MANDATORY)
public void deleteLinksWithAnyReferenceToPids(List<Long> theGoldenResourcePids) {
// Split into chunks of 500 so older versions of Oracle don't run into issues (500 = 1000 / 2 since the dao
// method uses the list twice in the sql predicate)
List<List<Long>> chunks = ListUtils.partition(theGoldenResourcePids, 500);
for (List<Long> chunk : chunks) {
myMdmLinkDao.deleteLinksWithAnyReferenceToPids(chunk);
}
}
}

View File

@ -0,0 +1,89 @@
package ca.uhn.fhir.jpa.mdm.svc;
/*-
* #%L
* HAPI FHIR JPA Server
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.svc.IGoldenResourceSearchSvc;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.mdm.api.MdmConstants;
import ca.uhn.fhir.rest.api.Constants;
import ca.uhn.fhir.rest.api.SortOrderEnum;
import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.param.DateRangeParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.util.DateRangeUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Date;
import java.util.List;
public class GoldenResourceSearchSvcImpl implements IGoldenResourceSearchSvc {
@Autowired
private MatchUrlService myMatchUrlService;
@Autowired
private DaoRegistry myDaoRegistry;
@Autowired
private FhirContext myFhirContext;
@Override
@Transactional
public IResourcePidList fetchGoldenResourceIdsPage(Date theStart, Date theEnd, @Nonnull Integer thePageSize, @Nullable RequestPartitionId theRequestPartitionId, @Nonnull String theResourceType) {
return fetchResourceIdsPageWithResourceType(theStart, theEnd, thePageSize, theResourceType, theRequestPartitionId);
}
private IResourcePidList fetchResourceIdsPageWithResourceType(Date theStart, Date theEnd, int thePageSize, String theResourceType, RequestPartitionId theRequestPartitionId) {
RuntimeResourceDefinition def = myFhirContext.getResourceDefinition(theResourceType);
SearchParameterMap searchParamMap = myMatchUrlService.translateMatchUrl(theResourceType, def);
searchParamMap.setSort(new SortSpec(Constants.PARAM_LASTUPDATED, SortOrderEnum.ASC));
DateRangeParam chunkDateRange = DateRangeUtil.narrowDateRange(searchParamMap.getLastUpdated(), theStart, theEnd);
searchParamMap.setLastUpdated(chunkDateRange);
searchParamMap.setCount(thePageSize); // request this many pids
searchParamMap.add("_tag", new TokenParam(MdmConstants.SYSTEM_GOLDEN_RECORD_STATUS, MdmConstants.CODE_GOLDEN_RECORD));
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(theResourceType);
SystemRequestDetails request = new SystemRequestDetails();
request.setRequestPartitionId(theRequestPartitionId);
List<ResourcePersistentId> ids = dao.searchForIds(searchParamMap, request);
Date lastDate = null;
if (ids.size() > 0) {
lastDate = dao.readByPid(ids.get(ids.size() - 1)).getMeta().getLastUpdated();
}
return new HomogeneousResourcePidList(theResourceType, ids, lastDate);
}
}

View File

@ -20,7 +20,10 @@ package ca.uhn.fhir.jpa.mdm.svc;
* #L%
*/
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.mdm.api.IGoldenResourceMergerSvc;
@ -33,14 +36,17 @@ import ca.uhn.fhir.mdm.api.MdmLinkJson;
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.mdm.api.paging.MdmPageRequest;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearAppCtx;
import ca.uhn.fhir.mdm.batch2.clear.MdmClearJobParameters;
import ca.uhn.fhir.mdm.model.MdmTransactionContext;
import ca.uhn.fhir.mdm.provider.MdmControllerHelper;
import ca.uhn.fhir.mdm.provider.MdmControllerUtil;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.provider.MultiUrlProcessor;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.util.ParametersUtil;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IIdType;
@ -49,6 +55,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.stereotype.Service;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.util.HashSet;
@ -77,6 +84,8 @@ public class MdmControllerSvcImpl implements IMdmControllerSvc {
IMdmBatchJobSubmitterFactory myMdmBatchJobSubmitterFactory;
@Autowired
IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Autowired
IJobCoordinator myJobCoordinator;
public MdmControllerSvcImpl() {
}
@ -167,9 +176,25 @@ public class MdmControllerSvcImpl implements IMdmControllerSvc {
}
@Override
public IBaseParameters submitMdmClearJob(List<String> theUrls, IPrimitiveType<BigDecimal> theBatchSize, ServletRequestDetails theRequestDetails) {
MultiUrlProcessor multiUrlProcessor = new MultiUrlProcessor(myFhirContext, myMdmBatchJobSubmitterFactory.getClearJobSubmitter());
return multiUrlProcessor.processUrls(theUrls, multiUrlProcessor.getBatchSize(theBatchSize), theRequestDetails);
public IBaseParameters submitMdmClearJob(@Nonnull List<String> theResourceNames, IPrimitiveType<BigDecimal> theBatchSize, ServletRequestDetails theRequestDetails) {
MdmClearJobParameters params = new MdmClearJobParameters();
params.setResourceNames(theResourceNames);
if (theBatchSize != null && theBatchSize.getValue() !=null && theBatchSize.getValue().longValue() > 0) {
params.setBatchSize(theBatchSize.getValue().intValue());
}
ReadPartitionIdRequestDetails details= new ReadPartitionIdRequestDetails(null, RestOperationTypeEnum.EXTENDED_OPERATION_SERVER, null, null, null);
RequestPartitionId requestPartition = myRequestPartitionHelperSvc.determineReadPartitionForRequest(theRequestDetails, null, details);
params.setRequestPartitionId(requestPartition);
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(MdmClearAppCtx.JOB_MDM_CLEAR);
request.setParameters(params);
String id = myJobCoordinator.startInstance(request);
IBaseParameters retVal = ParametersUtil.newInstance(myFhirContext);
ParametersUtil.addParameterToParametersString(myFhirContext, retVal, ProviderConstants.OPERATION_BATCH_RESPONSE_JOB_ID, id);
return retVal;
}
@Override

View File

@ -30,6 +30,7 @@ import ca.uhn.fhir.mdm.api.MdmMatchOutcome;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.mdm.log.Logs;
import ca.uhn.fhir.mdm.model.MdmTransactionContext;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseResource;
@ -39,7 +40,9 @@ import org.springframework.stereotype.Service;
import javax.annotation.Nonnull;
import javax.transaction.Transactional;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* This class is in charge of managing MdmLinks between Golden Resources and source resources
@ -97,6 +100,13 @@ public class MdmLinkSvcImpl implements IMdmLinkSvc {
}
}
@Override
@Transactional
public void deleteLinksWithAnyReferenceTo(List<ResourcePersistentId> theGoldenResourceIds) {
List<Long> goldenResourcePids = theGoldenResourceIds.stream().map(ResourcePersistentId::getIdAsLong).collect(Collectors.toList());
myMdmLinkDaoSvc.deleteLinksWithAnyReferenceToPids(goldenResourcePids);
}
/**
* Helper function which runs various business rules about what types of requests are allowed.
*/

View File

@ -1,9 +1,15 @@
package ca.uhn.fhir.jpa.mdm.config;
import ca.uhn.fhir.jpa.mdm.helper.MdmHelperR4;
import ca.uhn.fhir.jpa.subscription.channel.config.SubscriptionChannelConfig;
import ca.uhn.fhir.jpa.subscription.submit.config.SubscriptionSubmitterConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
@Import({SubscriptionSubmitterConfig.class, SubscriptionChannelConfig.class})
public class TestMdmConfigR4 extends BaseTestMdmConfig {
@Bean
MdmHelperR4 mdmHelperR4() {
return new MdmHelperR4();
}
}

View File

@ -108,5 +108,4 @@ public abstract class BaseMdmHelper implements BeforeEachCallback, AfterEachCall
public PointcutLatch getAfterMdmLatch() {
return myAfterMdmLatch;
}
}

View File

@ -5,10 +5,15 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
import ca.uhn.fhir.rest.server.TransactionLogMessages;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Patient;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import static ca.uhn.fhir.mdm.api.MdmConstants.CODE_GOLDEN_RECORD;
import static ca.uhn.fhir.mdm.api.MdmConstants.SYSTEM_GOLDEN_RECORD_STATUS;
public class MdmHelperR4 extends BaseMdmHelper {
@Autowired
private FhirContext myFhirContext;
@ -50,6 +55,13 @@ public class MdmHelperR4 extends BaseMdmHelper {
return isExternalHttpRequest ? dao.update(theResource, myMockSrd): dao.create(theResource);
}
@Nonnull
public Patient buildGoldenPatient() {
Patient patient = new Patient();
patient.getMeta().addTag(SYSTEM_GOLDEN_RECORD_STATUS, CODE_GOLDEN_RECORD, "Golden Record");
return patient;
}
/**
* OutcomeAndLogMessageWrapper is a simple wrapper class which is _excellent_. It allows us to skip the fact that java doesn't allow
* multiple returns, and wraps both the Method Outcome of the DAO, _and_ the TransactionLogMessages that were passed to the pointcut

View File

@ -22,19 +22,16 @@ import org.hl7.fhir.r4.model.Medication;
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.SearchParameter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Example;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import java.util.Date;
import java.util.List;
import static ca.uhn.fhir.mdm.api.MdmConstants.CODE_GOLDEN_RECORD;
import static ca.uhn.fhir.mdm.api.MdmConstants.CODE_GOLDEN_RECORD_REDIRECTED;
import static ca.uhn.fhir.mdm.api.MdmConstants.CODE_HAPI_MDM_MANAGED;
import static ca.uhn.fhir.mdm.api.MdmConstants.SYSTEM_GOLDEN_RECORD_STATUS;
@ -110,8 +107,7 @@ public class MdmStorageInterceptorIT extends BaseMdmR4Test {
@Test
public void testCreatePatientWithGoldenRecordTagForbidden() throws InterruptedException {
Patient patient = new Patient();
patient.getMeta().addTag(SYSTEM_GOLDEN_RECORD_STATUS, CODE_GOLDEN_RECORD, "Golden Record");
Patient patient = myMdmHelper.buildGoldenPatient();
try {
myMdmHelper.doCreateResource(patient, true);
fail();

View File

@ -36,7 +36,7 @@ public abstract class BaseLinkR4Test extends BaseProviderR4Test {
@Override
@BeforeEach
public void before() {
public void before() throws Exception {
super.before();
myPatient = createPatientAndUpdateLinks(buildPaulPatient());

View File

@ -1,6 +1,9 @@
package ca.uhn.fhir.jpa.mdm.provider;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.jpa.mdm.BaseMdmR4Test;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.mdm.api.IMdmClearJobSubmitter;
import ca.uhn.fhir.mdm.api.IMdmControllerSvc;
import ca.uhn.fhir.mdm.api.IMdmSubmitSvc;
@ -41,21 +44,29 @@ public abstract class BaseProviderR4Test extends BaseMdmR4Test {
@Autowired
BatchJobHelper myBatchJobHelper;
@Autowired
Batch2JobHelper myBatch2JobHelper;
@Autowired
MessageHelper myMessageHelper;
@Autowired
private IJobCoordinator myJobCoordinator;
@Autowired
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
private String defaultScript;
protected void setMdmRuleJson(String theString) throws IOException {
DefaultResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(theString);
String json = IOUtils.toString(resource.getInputStream(), Charsets.UTF_8);
myMdmSettings.setEnabled(true);
myMdmSettings.setScriptText(json);
myMdmResourceMatcherSvc.init();
}
@BeforeEach
public void before() {
public void before() throws Exception {
myMdmProvider = new MdmProviderDstu3Plus(myFhirContext, myMdmControllerSvc, myMdmHelper, myMdmSubmitSvc, myMdmSettings);
// FhirContext theFhirContext, IJobCoordinator theJobCoordinator, IRequestPartitionHelperSvc theRequestPartitionHelperSvc
defaultScript = myMdmSettings.getScriptText();
}
@ -69,12 +80,12 @@ public abstract class BaseProviderR4Test extends BaseMdmR4Test {
protected void clearMdmLinks() {
Parameters result = (Parameters) myMdmProvider.clearMdmLinks(null, null, myRequestDetails);
myBatchJobHelper.awaitJobExecution(BatchHelperR4.jobIdFromParameters(result));
myBatch2JobHelper.awaitJobCompletion(BatchHelperR4.jobIdFromBatch2Parameters(result));
}
protected void clearMdmLinks(String theResourceName) {
Parameters result = (Parameters) myMdmProvider.clearMdmLinks(getResourceNames(theResourceName), null, myRequestDetails);
myBatchJobHelper.awaitJobExecution(BatchHelperR4.jobIdFromParameters(result));
myBatch2JobHelper.awaitJobCompletion(BatchHelperR4.jobIdFromBatch2Parameters(result));
}
@Nonnull

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.mdm.provider;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.IInterceptorService;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.mdm.rules.config.MdmSettings;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.test.concurrency.PointcutLatch;
@ -25,7 +26,6 @@ import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.fail;
public class MdmProviderBatchR4Test extends BaseLinkR4Test {
public static final String ORGANIZATION_DUMMY = "Organization/dummy";
protected Practitioner myPractitioner;
protected StringType myPractitionerId;
@ -38,11 +38,14 @@ public class MdmProviderBatchR4Test extends BaseLinkR4Test {
@Autowired
IInterceptorService myInterceptorService;
@Autowired
MdmSettings myMdmSettings;
PointcutLatch afterMdmLatch = new PointcutLatch(Pointcut.MDM_AFTER_PERSISTED_RESOURCE_CHECKED);
@Override
@BeforeEach
public void before() {
public void before() throws Exception {
super.before();
myPractitioner = createPractitionerAndUpdateLinks(buildPractitionerWithNameAndId("some_pract", "some_pract_id"));
myPractitionerId = new StringType(myPractitioner.getIdElement().getValue());
@ -59,12 +62,14 @@ public class MdmProviderBatchR4Test extends BaseLinkR4Test {
myGoldenMedicationId = new StringType(myGoldenMedication.getIdElement().getValue());
myInterceptorService.registerAnonymousInterceptor(Pointcut.MDM_AFTER_PERSISTED_RESOURCE_CHECKED, afterMdmLatch);
myMdmSettings.setEnabled(true);
}
@Override
@AfterEach
public void after() throws IOException {
myInterceptorService.unregisterInterceptor(afterMdmLatch);
myMdmSettings.setEnabled(false);
super.after();
}

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.mdm.provider;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.entity.MdmLink;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.mdm.api.MdmConstants;
@ -9,7 +8,7 @@ import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Practitioner;
@ -36,12 +35,14 @@ public class MdmProviderClearLinkR4Test extends BaseLinkR4Test {
protected StringType myPractitionerGoldenResourceId;
@BeforeEach
public void before() {
public void before() throws Exception {
super.before();
myPractitioner = createPractitionerAndUpdateLinks(new Practitioner());
myPractitionerId = new StringType(myPractitioner.getIdElement().getValue());
myPractitionerGoldenResource = getGoldenResourceFromTargetResource(myPractitioner);
myPractitionerGoldenResourceId = new StringType(myPractitionerGoldenResource.getIdElement().getValue());
setMdmRuleJson("mdm/nickname-mdm-rules.json");
}
@Test
@ -74,7 +75,8 @@ public class MdmProviderClearLinkR4Test extends BaseLinkR4Test {
try {
myPatientDao.read(new IdDt(mySourcePatientId.getValueAsString()).toVersionless());
fail();
} catch (ResourceNotFoundException e) {
} catch (ResourceGoneException e) {
// Expected exception
}
}
@ -162,7 +164,7 @@ public class MdmProviderClearLinkR4Test extends BaseLinkR4Test {
try {
myPractitionerDao.read(new IdDt(myPractitionerGoldenResourceId.getValueAsString()).toVersionless());
fail();
} catch (ResourceNotFoundException e) {
} catch (ResourceGoneException e) {
}
}
@ -172,7 +174,7 @@ public class MdmProviderClearLinkR4Test extends BaseLinkR4Test {
myMdmProvider.clearMdmLinks(getResourceNames("Observation"), null, myRequestDetails);
fail();
} catch (InvalidRequestException e) {
assertThat(e.getMessage(), is(equalTo(Msg.code(1500) + "$mdm-clear does not support resource type: Observation")));
assertThat(e.getMessage(), is(equalTo("HAPI-1500: $mdm-clear does not support resource type: Observation")));
}
}

View File

@ -31,7 +31,7 @@ public class MdmProviderMatchR4Test extends BaseProviderR4Test {
@Override
@BeforeEach
public void before() {
public void before() throws Exception {
super.before();
}

View File

@ -1,7 +1,7 @@
package ca.uhn.fhir.jpa.mdm.provider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.MdmLink;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
@ -38,7 +38,7 @@ public class MdmProviderMergeGoldenResourcesR4Test extends BaseProviderR4Test {
@Override
@BeforeEach
public void before() {
public void before() throws Exception {
super.before();
myFromGoldenPatient = createGoldenPatient();

View File

@ -34,7 +34,7 @@ public class MdmProviderNotDuplicateGoldenResourceR4Test extends BaseProviderR4T
@Override
@BeforeEach
public void before() {
public void before() throws Exception {
super.before();
myGoldenPatient = createGoldenPatient();

View File

@ -12,14 +12,12 @@ import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.dstu3.model.UnsignedIntType;
import org.hl7.fhir.instance.model.api.IAnyResource;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.BooleanType;
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.Type;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@ -48,7 +46,7 @@ public class MdmProviderQueryLinkR4Test extends BaseLinkR4Test {
@Override
@BeforeEach
public void before() {
public void before() throws Exception {
super.before();
// Add a second patient

View File

@ -7,30 +7,32 @@ import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.mdm.provider.BaseLinkR4Test;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.mdm.api.IMdmControllerSvc;
import ca.uhn.fhir.mdm.api.MdmLinkJson;
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.mdm.api.paging.MdmPageRequest;
import ca.uhn.fhir.mdm.model.MdmTransactionContext;
import ca.uhn.fhir.mdm.rules.config.MdmSettings;
import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.test.utilities.BatchJobHelper;
import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.hl7.fhir.r4.model.DecimalType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.StringType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.data.domain.Page;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
@ -53,15 +55,24 @@ public class MdmControllerSvcImplTest extends BaseLinkR4Test {
@Autowired
private IInterceptorService myInterceptorService;
@Autowired
private BatchJobHelper myBatchJobHelper;
private Batch2JobHelper myBatch2JobHelper;
@Autowired
private MdmSettings myMdmSettings;
@BeforeEach
public void before() {
public void before() throws Exception {
super.before();
myPartitionSettings.setPartitioningEnabled(true);
myPartitionLookupSvc.createPartition(new PartitionEntity().setId(1).setName(PARTITION_1));
myPartitionLookupSvc.createPartition(new PartitionEntity().setId(2).setName(PARTITION_2));
myInterceptorService.registerInterceptor(new RequestTenantPartitionInterceptor());
myMdmSettings.setEnabled(true);
}
@AfterEach
public void after() throws IOException {
myMdmSettings.setEnabled(false);
super.after();
}
@Test
@ -130,14 +141,13 @@ public class MdmControllerSvcImplTest extends BaseLinkR4Test {
assertLinkCount(3);
List<String> urls = new ArrayList<>();
urls.add("Practitioner?");
urls.add("Practitioner");
IPrimitiveType<BigDecimal> batchSize = new DecimalType(new BigDecimal(100));
ServletRequestDetails details = new ServletRequestDetails();
details.setTenantId(PARTITION_2);
IBaseParameters clearJob = myMdmControllerSvc.submitMdmClearJob(urls, batchSize, details);
Long jobId = Long.valueOf(((DecimalType) ((Parameters) clearJob).getParameter("jobId")).getValueAsString());
JobExecution jobExecution = myBatchJobHelper.awaitJobExecution(jobId);
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
String jobId = ((StringType) ((Parameters) clearJob).getParameter("jobId")).getValueAsString();
myBatch2JobHelper.awaitJobCompletion(jobId);
assertLinkCount(2);
}

View File

@ -0,0 +1,135 @@
package ca.uhn.fhir.mdm.batch2.clear;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.jpa.entity.MdmLink;
import ca.uhn.fhir.jpa.mdm.BaseMdmR4Test;
import ca.uhn.fhir.jpa.mdm.helper.MdmHelperR4;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import org.hl7.fhir.r4.model.Patient;
import org.hl7.fhir.r4.model.Reference;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
class MdmClearStepTest extends BaseMdmR4Test {
private static final String GOLDEN_ID = "Patient/GOLDEN-ID";
private static final String SOURCE_ID = "Patient/SOURCE-ID";
@Autowired
MdmClearStep myMdmClearStep;
@Autowired
MdmHelperR4 myMdmHelperR4;
@Mock
IJobDataSink<VoidModel> myDataSink;
private Long mySourcePid;
private Long myGoldenPid;
private MdmLink myLink;
private String myGoldenId;
private String mySourceId;
@BeforeEach
public void before() {
Patient sourcePatient = new Patient();
mySourceId = SOURCE_ID + "1";
sourcePatient.setId(mySourceId);
myPatientDao.update(sourcePatient);
Patient goldenPatient = myMdmHelperR4.buildGoldenPatient();
myGoldenId = GOLDEN_ID + "1";
goldenPatient.setId(myGoldenId);
myPatientDao.update(goldenPatient);
mySourcePid = myIdHelperService.getPidOrThrowException(sourcePatient);
myGoldenPid = myIdHelperService.getPidOrThrowException(goldenPatient);
myLink = buildMdmLink(mySourcePid, myGoldenPid);
myMdmLinkDaoSvc.save(myLink);
}
@Test
public void testSimpleCase() {
assertPatientCount(2);
assertLinkCount(1);
mdmClearGoldenResource();
assertLinkCount(0);
assertPatientCount(1);
assertPatientExists(mySourceId);
}
@Test
public void testWithReferenceToGoldenResource() {
Patient husband = new Patient();
husband.addLink().setOther(new Reference(myGoldenId));
String husbandId = myPatientDao.create(husband).getId().toUnqualifiedVersionless().getValue();
assertPatientCount(3);
assertLinkCount(1);
try {
mdmClearGoldenResource();
fail();
} catch (ResourceVersionConflictException e) {
assertEquals("HAPI-0550: HAPI-0515: Unable to delete " + myGoldenId +
" because at least one resource has a reference to this resource. First reference found was resource " +
husbandId + " in path Patient.link.other", e.getMessage());
}
}
private void mdmClearGoldenResource() {
ResourceIdListWorkChunkJson chunk = new ResourceIdListWorkChunkJson();
chunk.addTypedPid("Patient", myGoldenPid);
RequestDetails requestDetails = new SystemRequestDetails();
TransactionDetails transactionDetails = new TransactionDetails();
StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = buildStepExecutionDetails(chunk);
myMdmClearStep.myHapiTransactionService.execute(requestDetails, transactionDetails, myMdmClearStep.buildJob(requestDetails, transactionDetails, stepExecutionDetails));
}
@NotNull
private StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> buildStepExecutionDetails(ResourceIdListWorkChunkJson chunk) {
String instanceId = UUID.randomUUID().toString();
String chunkid = UUID.randomUUID().toString();
MdmClearJobParameters parms = new MdmClearJobParameters();
StepExecutionDetails<MdmClearJobParameters, ResourceIdListWorkChunkJson> stepExecutionDetails = new StepExecutionDetails<>(parms, chunk, instanceId, chunkid);
return stepExecutionDetails;
}
private MdmLink buildMdmLink(Long sourcePid, Long goldenPid) {
return new MdmLink()
.setSourcePid(sourcePid)
.setGoldenResourcePid(goldenPid)
.setLinkSource(MdmLinkSourceEnum.MANUAL)
.setMatchResult(MdmMatchResultEnum.MATCH)
.setVersion("1");
}
private void assertPatientExists(String theSourceId) {
assertNotNull(myPatientDao.read(new IdDt(theSourceId)));
}
private void assertPatientCount(int theExpectedCount) {
assertEquals(theExpectedCount, myPatientDao.search(SearchParameterMap.newSynchronous()).size());
}
}

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -98,7 +98,7 @@ public class InMemoryResourceMatcher {
validationSupportState = ValidationSupportInitializationState.INITIALIZED;
} catch (BeansException | ConfigurationException ignore) {
// We couldn't get a validation support bean, and we don't want to waste cycles trying again
ourLog.warn(Msg.code(2095) + "No bean satisfying IValidationSupport could be initialized. Qualifiers dependent on IValidationSupport will not be supported.");
ourLog.warn(Msg.code(2100) + "No bean satisfying IValidationSupport could be initialized. Qualifiers dependent on IValidationSupport will not be supported.");
validationSupportState = ValidationSupportInitializationState.FAILED;
}
}

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -39,7 +39,7 @@ public class Batch2JobHelper {
@Autowired
private IJobCoordinator myJobCoordinator;
public void awaitMultipleChunkJobCompletion(String theId) {
public void awaitJobCompletion(String theId) {
await().until(() -> {
myJobMaintenanceService.runMaintenancePass();
return myJobCoordinator.getInstance(theId).getStatus();

View File

@ -1,6 +1,7 @@
package ca.uhn.fhir.jpa.batch.mdm.job;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.batch.mdm.batch.job.MdmLinkDeleter;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

View File

@ -15,7 +15,6 @@ import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.test.Batch2JobHelper;
import ca.uhn.fhir.model.api.IModelJson;
@ -190,7 +189,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
// wait for last step to finish
myLastStepLatch.setExpectedCount(1);
myBatch2JobHelper.awaitMultipleChunkJobCompletion(instanceId);
myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
// verify
@ -233,7 +232,7 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
myBatch2JobHelper.awaitGatedStepId(FIRST_STEP_ID, instanceId);
myLastStepLatch.setExpectedCount(2);
myBatch2JobHelper.awaitMultipleChunkJobCompletion(instanceId);
myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
}

View File

@ -17,12 +17,12 @@ import ca.uhn.fhir.jpa.bulk.export.model.BulkExportJobStatusEnum;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportCollectionFileDao;
import ca.uhn.fhir.jpa.dao.data.IBulkExportJobDao;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionEntity;
import ca.uhn.fhir.jpa.entity.BulkExportCollectionFileEntity;
import ca.uhn.fhir.jpa.entity.BulkExportJobEntity;
import ca.uhn.fhir.jpa.entity.MdmLink;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.mdm.api.MdmLinkSourceEnum;
import ca.uhn.fhir.mdm.api.MdmMatchResultEnum;
import ca.uhn.fhir.parser.IParser;
@ -36,7 +36,6 @@ import ca.uhn.fhir.util.UrlUtil;
import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.time.DateUtils;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Binary;
@ -80,7 +79,6 @@ import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;

View File

@ -3,7 +3,7 @@ package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexChunkIds;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
@ -50,9 +50,9 @@ public class ReindexStepTest extends BaseJpaR4Test {
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
data.addTypedPid("Patient", id0);
data.addTypedPid("Patient", id1);
// Execute
@ -80,9 +80,9 @@ public class ReindexStepTest extends BaseJpaR4Test {
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
data.addTypedPid("Patient", id0);
data.addTypedPid("Patient", id1);
// Execute
@ -108,9 +108,9 @@ public class ReindexStepTest extends BaseJpaR4Test {
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON")).getIdPartAsLong();
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS")).getIdPartAsLong();
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
data.addTypedPid("Patient", id0);
data.addTypedPid("Patient", id1);
runInTransaction(() -> {
myResourceIndexedSearchParamStringDao.deleteByResourceId(id0);
@ -145,9 +145,9 @@ public class ReindexStepTest extends BaseJpaR4Test {
Long id0 = createPatient(withActiveTrue(), withFamily("SIMPSON"), withOrganization(orgId)).getIdPartAsLong();
Long id1 = createPatient(withActiveTrue(), withFamily("FLANDERS"), withOrganization(orgId)).getIdPartAsLong();
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
data.addTypedPid("Patient", id0);
data.addTypedPid("Patient", id1);
SearchParameter sp = new SearchParameter();
sp.setType(Enumerations.SearchParamType.STRING);
@ -208,11 +208,11 @@ public class ReindexStepTest extends BaseJpaR4Test {
Long idPatientToInvalidate = createPatient().getIdPartAsLong();
Long idObservation = createObservation(withSubject(new IdType("Patient/" + idPatientToInvalidate))).getIdPartAsLong();
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id0.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(id1.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(idPatientToInvalidate.toString()));
data.getIds().add(new ReindexChunkIds.Id().setResourceType("Observation").setId(idObservation.toString()));
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson();
data.addTypedPid("Patient", id0);
data.addTypedPid("Patient", id1);
data.addTypedPid("Patient", idPatientToInvalidate);
data.addTypedPid("Observation", idObservation);
runInTransaction(() -> {
// Swap in some invalid text, which will cause an error when we go to reindex

View File

@ -1,15 +1,18 @@
package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.pid.TypedResourcePid;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
import java.util.List;
import static ca.uhn.fhir.batch2.jobs.step.ResourceIdListStep.DEFAULT_PAGE_SIZE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -51,12 +54,12 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
IResourceReindexSvc.IdChunk page = mySvc.fetchResourceIdsPage(start, end, null, null);
IResourcePidList page = mySvc.fetchResourceIdsPage(start, end, DEFAULT_PAGE_SIZE, null, null);
// Verify
assertThat(page.getResourceTypes().toString(), page.getResourceTypes(), contains("Patient", "Patient", "Observation"));
assertThat(page.getIds(), contains(new ResourcePersistentId(id0), new ResourcePersistentId(id1), new ResourcePersistentId(id2)));
assertEquals(3, page.size());
assertThat(page.getTypedResourcePids(), contains(new TypedResourcePid("Patient", id0), new TypedResourcePid("Patient", id1), new TypedResourcePid("Observation", id2)));
assertTrue(page.getLastDate().after(beforeLastInRange));
assertTrue(page.getLastDate().before(end));
@ -81,12 +84,12 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
IResourceReindexSvc.IdChunk page = mySvc.fetchResourceIdsPage(start, end, null, null);
IResourcePidList page = mySvc.fetchResourceIdsPage(start, end, DEFAULT_PAGE_SIZE, null, null);
// Verify
assertEquals(0, page.getResourceTypes().size());
assertEquals(0, page.getIds().size());
assertTrue(page.isEmpty());
assertEquals(0, page.size());
assertNull(page.getLastDate());
assertEquals(1, myCaptureQueriesListener.logSelectQueries().size());
@ -129,12 +132,13 @@ public class ResourceReindexSvcImplTest extends BaseJpaR4Test {
// Execute
myCaptureQueriesListener.clear();
IResourceReindexSvc.IdChunk page = mySvc.fetchResourceIdsPage(start, end, null, "Patient?active=false");
IResourcePidList page = mySvc.fetchResourceIdsPage(start, end, DEFAULT_PAGE_SIZE, null, "Patient?active=false");
// Verify
assertThat(page.getResourceTypes().toString(), page.getResourceTypes(), contains("Patient", "Patient"));
assertThat(page.getIds(), contains(new ResourcePersistentId(id0), new ResourcePersistentId(id1)));
assertEquals(2, page.size());
List<TypedResourcePid> typedResourcePids = page.getTypedResourcePids();
assertThat(page.getTypedResourcePids(), contains(new TypedResourcePid("Patient", id0), new TypedResourcePid("Patient", id1)));
assertTrue(page.getLastDate().after(beforeLastInRange));
assertTrue(page.getLastDate().before(end));

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
@ -32,6 +32,11 @@
<artifactId>hapi-fhir-storage</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-storage-batch2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>

View File

@ -54,5 +54,5 @@ public interface IMdmControllerSvc {
IAnyResource createLink(String theGoldenResourceId, String theSourceResourceId, @Nullable String theMatchResult, MdmTransactionContext theMdmTransactionContext);
IBaseParameters submitMdmClearJob(List<String> theUrls, IPrimitiveType<BigDecimal> theBatchSize, ServletRequestDetails theRequestDetails);
IBaseParameters submitMdmClearJob(List<String> theResourceNames, IPrimitiveType<BigDecimal> theBatchSize, ServletRequestDetails theRequestDetails);
}

View File

@ -21,8 +21,11 @@ package ca.uhn.fhir.mdm.api;
*/
import ca.uhn.fhir.mdm.model.MdmTransactionContext;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.instance.model.api.IAnyResource;
import java.util.List;
public interface IMdmLinkSvc {
/**
@ -45,4 +48,10 @@ public interface IMdmLinkSvc {
* @param theMdmTransactionContext
*/
void deleteLink(IAnyResource theExistingGoldenResource, IAnyResource theSourceResource, MdmTransactionContext theMdmTransactionContext);
/**
* Delete all link records whose source or target points to the provided pids.
* @param thePersistentIds
*/
void deleteLinksWithAnyReferenceTo(List<ResourcePersistentId> thePersistentIds);
}

View File

@ -153,7 +153,6 @@ public class MdmProviderDstu3Plus extends BaseMdmProvider {
List<String> resourceNames = new ArrayList<>();
if (theResourceNames != null) {
resourceNames.addAll(theResourceNames.stream().map(IPrimitiveType::getValue).collect(Collectors.toList()));
validateResourceNames(resourceNames);
@ -161,8 +160,7 @@ public class MdmProviderDstu3Plus extends BaseMdmProvider {
resourceNames.addAll(myMdmSettings.getMdmRules().getMdmTypes());
}
List<String> urls = resourceNames.stream().map(s -> s + "?").collect(Collectors.toList());
return myMdmControllerSvc.submitMdmClearJob(urls, theBatchSize, theRequestDetails);
return myMdmControllerSvc.submitMdmClearJob(resourceNames, theBatchSize, theRequestDetails);
}
private void validateResourceNames(List<String> theResourceNames) {

View File

@ -53,14 +53,11 @@ public class MdmProviderLoader {
switch (myFhirContext.getVersion().getVersion()) {
case DSTU3:
case R4:
myResourceProviderFactory.addSupplier(() -> {
myMdmProvider = new MdmProviderDstu3Plus(myFhirContext,
myResourceProviderFactory.addSupplier(() -> new MdmProviderDstu3Plus(myFhirContext,
myMdmControllerSvc,
myMdmControllerHelper,
myMdmSubmitSvc,
myMdmSettings);
return myMdmProvider;
});
myMdmSettings));
break;
default:
throw new ConfigurationException(Msg.code(1497) + "MDM not supported for FHIR version " + myFhirContext.getVersion().getVersion());

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-apache</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-client-okhttp</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-sample-server-jersey</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-spring-boot</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
</parent>
<artifactId>hapi-fhir-spring-boot-samples</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -1,5 +1,25 @@
package ca.uhn.fhir.jpa.migrate.taskdef;
/*-
* #%L
* HAPI FHIR Server - SQL Migration
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.migrate.DriverTypeEnum;

View File

@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -26,39 +26,35 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import org.hl7.fhir.r4.model.InstantType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.Date;
public class GenerateRangeChunksStep implements IFirstJobStepWorker<ReindexJobParameters, ReindexChunkRange> {
private static final Logger ourLog = LoggerFactory.getLogger(GenerateRangeChunksStep.class);
import static ca.uhn.fhir.batch2.config.Batch2Constants.BATCH_START_DATE;
@Autowired
private IResourceReindexSvc myResourceReindexSvc;
public class GenerateRangeChunksStep implements IFirstJobStepWorker<ReindexJobParameters, ReindexChunkRangeJson> {
private static final Logger ourLog = LoggerFactory.getLogger(GenerateRangeChunksStep.class);
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, VoidModel> theStepExecutionDetails, @Nonnull IJobDataSink<ReindexChunkRange> theDataSink) throws JobExecutionFailedException {
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, VoidModel> theStepExecutionDetails, @Nonnull IJobDataSink<ReindexChunkRangeJson> theDataSink) throws JobExecutionFailedException {
ReindexJobParameters params = theStepExecutionDetails.getParameters();
Date start = new InstantType("2000-01-01T00:00:00Z").getValue();
Date start = BATCH_START_DATE;
Date end = new Date();
if (params.getUrl().isEmpty()) {
ourLog.info("Initiating reindex of All Resources from {} to {}", start, end);
ReindexChunkRange nextRange = new ReindexChunkRange();
ReindexChunkRangeJson nextRange = new ReindexChunkRangeJson();
nextRange.setStart(start);
nextRange.setEnd(end);
theDataSink.accept(nextRange);
} else {
for (String nextUrl : params.getUrl()) {
ourLog.info("Initiating reindex of [{}]] from {} to {}", nextUrl, start, end);
ReindexChunkRange nextRange = new ReindexChunkRange();
ReindexChunkRangeJson nextRange = new ReindexChunkRangeJson();
nextRange.setUrl(nextUrl);
nextRange.setStart(start);
nextRange.setEnd(end);

View File

@ -25,108 +25,29 @@ import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.step.IIdChunkProducer;
import ca.uhn.fhir.batch2.jobs.step.ResourceIdListStep;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
public class LoadIdsStep implements IJobStepWorker<ReindexJobParameters, ReindexChunkRange, ReindexChunkIds> {
private static final Logger ourLog = LoggerFactory.getLogger(LoadIdsStep.class);
public class LoadIdsStep implements IJobStepWorker<ReindexJobParameters, ReindexChunkRangeJson, ResourceIdListWorkChunkJson> {
private final IResourceReindexSvc myResourceReindexSvc;
@Autowired
private IResourceReindexSvc myResourceReindexSvc;
private final ResourceIdListStep<ReindexJobParameters, ReindexChunkRangeJson> myResourceIdListStep;
public LoadIdsStep(IResourceReindexSvc theResourceReindexSvc) {
myResourceReindexSvc = theResourceReindexSvc;
IIdChunkProducer<ReindexChunkRangeJson> idChunkProducer = new ReindexIdChunkProducer(theResourceReindexSvc);
myResourceIdListStep = new ResourceIdListStep<>(idChunkProducer);
}
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, ReindexChunkRange> theStepExecutionDetails, @Nonnull IJobDataSink<ReindexChunkIds> theDataSink) throws JobExecutionFailedException {
ReindexChunkRange data = theStepExecutionDetails.getData();
Date start = data.getStart();
Date end = data.getEnd();
ourLog.info("Beginning scan for reindex IDs in range {} to {}", start, end);
Date nextStart = start;
RequestPartitionId requestPartitionId = theStepExecutionDetails.getParameters().getRequestPartitionId();
Set<ReindexChunkIds.Id> idBuffer = new LinkedHashSet<>();
long previousLastTime = 0L;
int totalIdsFound = 0;
int chunkCount = 0;
while (true) {
String url = theStepExecutionDetails.getData().getUrl();
ourLog.info("Fetching resource ID chunk for URL {} - Range {} - {}", url, nextStart, end);
IResourceReindexSvc.IdChunk nextChunk = myResourceReindexSvc.fetchResourceIdsPage(nextStart, end, requestPartitionId, url);
if (nextChunk.getIds().isEmpty()) {
ourLog.info("No data returned");
break;
}
ourLog.info("Found {} IDs from {} to {}", nextChunk.getIds().size(), nextStart, nextChunk.getLastDate());
for (int i = 0; i < nextChunk.getIds().size(); i++) {
ReindexChunkIds.Id nextId = new ReindexChunkIds.Id();
nextId.setResourceType(nextChunk.getResourceTypes().get(i));
nextId.setId(nextChunk.getIds().get(i).getId().toString());
idBuffer.add(nextId);
}
// If we get the same last time twice in a row, we've clearly reached the end
if (nextChunk.getLastDate().getTime() == previousLastTime) {
ourLog.info("Matching final timestamp of {}, loading is completed", new Date(previousLastTime));
break;
}
previousLastTime = nextChunk.getLastDate().getTime();
nextStart = nextChunk.getLastDate();
while (idBuffer.size() >= 1000) {
List<ReindexChunkIds.Id> submissionIds = new ArrayList<>();
for (Iterator<ReindexChunkIds.Id> iter = idBuffer.iterator(); iter.hasNext(); ) {
submissionIds.add(iter.next());
iter.remove();
if (submissionIds.size() >= 1000) {
break;
}
}
totalIdsFound += submissionIds.size();
chunkCount++;
submitWorkChunk(submissionIds, theDataSink);
}
}
totalIdsFound += idBuffer.size();
chunkCount++;
submitWorkChunk(idBuffer, theDataSink);
ourLog.info("Submitted {} chunks with {} resource IDs", chunkCount, totalIdsFound);
return RunOutcome.SUCCESS;
}
private void submitWorkChunk(Collection<ReindexChunkIds.Id> theIdBuffer, IJobDataSink<ReindexChunkIds> theDataSink) {
if (theIdBuffer.isEmpty()) {
return;
}
ourLog.info("Submitting work chunk with {} IDs", theIdBuffer.size());
ReindexChunkIds data = new ReindexChunkIds();
data.getIds().addAll(theIdBuffer);
theDataSink.accept(data);
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, ReindexChunkRangeJson> theStepExecutionDetails, @Nonnull IJobDataSink<ResourceIdListWorkChunkJson> theDataSink) throws JobExecutionFailedException {
return myResourceIdListStep.run(theStepExecutionDetails, theDataSink);
}
}

View File

@ -21,8 +21,10 @@ package ca.uhn.fhir.batch2.jobs.reindex;
*/
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -33,7 +35,7 @@ public class ReindexAppCtx {
public static final String JOB_REINDEX = "REINDEX";
@Bean
public JobDefinition<ReindexJobParameters> reindexJobDefinition() {
public JobDefinition<ReindexJobParameters> reindexJobDefinition(IResourceReindexSvc theResourceReindexSvc) {
return JobDefinition
.newBuilder()
.setJobDefinitionId(JOB_REINDEX)
@ -45,13 +47,13 @@ public class ReindexAppCtx {
.addFirstStep(
"generate-ranges",
"Generate data ranges to reindex",
ReindexChunkRange.class,
ReindexChunkRangeJson.class,
reindexGenerateRangeChunksStep())
.addIntermediateStep(
"load-ids",
"Load IDs of resources to reindex",
ReindexChunkIds.class,
loadIdsStep())
ResourceIdListWorkChunkJson.class,
loadIdsStep(theResourceReindexSvc))
.addLastStep("reindex",
"Perform the resource reindex",
reindexStep()
@ -75,8 +77,8 @@ public class ReindexAppCtx {
}
@Bean
public LoadIdsStep loadIdsStep() {
return new LoadIdsStep();
public LoadIdsStep loadIdsStep(IResourceReindexSvc theResourceReindexSvc) {
return new LoadIdsStep(theResourceReindexSvc);
}
@Bean

View File

@ -1,102 +0,0 @@
package ca.uhn.fhir.batch2.jobs.reindex;
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import java.util.ArrayList;
import java.util.List;
public class ReindexChunkIds implements IModelJson {
@JsonProperty("ids")
private List<Id> myIds;
public List<Id> getIds() {
if (myIds == null) {
myIds = new ArrayList<>();
}
return myIds;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("ids", myIds)
.toString();
}
public static class Id implements IModelJson {
@JsonProperty("type")
private String myResourceType;
@JsonProperty("id")
private String myId;
@Override
public String toString() {
// We put a space in here and not a "/" since this is a PID, not
// a resource ID
return "[" + myResourceType + " " + myId + "]";
}
public String getResourceType() {
return myResourceType;
}
public Id setResourceType(String theResourceType) {
myResourceType = theResourceType;
return this;
}
public String getId() {
return myId;
}
public Id setId(String theId) {
myId = theId;
return this;
}
@Override
public boolean equals(Object theO) {
if (this == theO) return true;
if (theO == null || getClass() != theO.getClass()) return false;
Id id = (Id) theO;
return new EqualsBuilder().append(myResourceType, id.myResourceType).append(myId, id.myId).isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(myResourceType).append(myId).toHashCode();
}
}
}

View File

@ -1,81 +0,0 @@
package ca.uhn.fhir.batch2.jobs.reindex;
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.jpa.util.JsonDateDeserializer;
import ca.uhn.fhir.jpa.util.JsonDateSerializer;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Date;
public class ReindexChunkRange implements IModelJson {
@Nullable
@JsonProperty("url")
private String myUrl;
@JsonSerialize(using = JsonDateSerializer.class)
@JsonDeserialize(using = JsonDateDeserializer.class)
@JsonProperty("start")
@Nonnull
private Date myStart;
@JsonSerialize(using = JsonDateSerializer.class)
@JsonDeserialize(using = JsonDateDeserializer.class)
@JsonProperty("end")
@Nonnull
private Date myEnd;
@Nullable
public String getUrl() {
return myUrl;
}
public void setUrl(@Nullable String theUrl) {
myUrl = theUrl;
}
@Nonnull
public Date getStart() {
return myStart;
}
public ReindexChunkRange setStart(@Nonnull Date theStart) {
myStart = theStart;
return this;
}
@Nonnull
public Date getEnd() {
return myEnd;
}
public ReindexChunkRange setEnd(@Nonnull Date theEnd) {
myEnd = theEnd;
return this;
}
}

View File

@ -0,0 +1,41 @@
package ca.uhn.fhir.batch2.jobs.reindex;
/*-
* #%L
* hapi-fhir-storage-batch2-jobs
* %%
* Copyright (C) 2014 - 2022 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
public class ReindexChunkRangeJson extends ChunkRangeJson {
@Nullable
@JsonProperty("url")
private String myUrl;
@Nullable
public String getUrl() {
return myUrl;
}
public void setUrl(@Nullable String theUrl) {
myUrl = theUrl;
}
}

View File

@ -0,0 +1,29 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.jobs.step.IIdChunkProducer;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Date;
public class ReindexIdChunkProducer implements IIdChunkProducer<ReindexChunkRangeJson> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexIdChunkProducer.class);
private final IResourceReindexSvc myResourceReindexSvc;
public ReindexIdChunkProducer(IResourceReindexSvc theResourceReindexSvc) {
myResourceReindexSvc = theResourceReindexSvc;
}
@Override
public IResourcePidList fetchResourceIdsPage(Date theNextStart, Date theEnd, @Nonnull Integer thePageSize, @Nullable RequestPartitionId theRequestPartitionId, ReindexChunkRangeJson theData) {
String url = theData.getUrl();
ourLog.info("Fetching resource ID chunk for URL {} - Range {} - {}", url, theNextStart, theEnd);
return myResourceReindexSvc.fetchResourceIdsPage(theNextStart, theEnd, thePageSize, theRequestPartitionId, url);
}
}

View File

@ -20,6 +20,7 @@ package ca.uhn.fhir.batch2.jobs.reindex;
* #L%
*/
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedJobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -31,25 +32,12 @@ import javax.validation.constraints.Pattern;
import java.util.ArrayList;
import java.util.List;
public class ReindexJobParameters implements IModelJson {
public class ReindexJobParameters extends PartitionedJobParameters {
@JsonProperty("url")
@Nullable
private List<@Pattern(regexp = "^[A-Z][A-Za-z0-9]+\\?.*", message = "If populated, URL must be a search URL in the form '{resourceType}?[params]'") String> myUrl;
@JsonProperty(value = "partitionId")
@Nullable
private RequestPartitionId myRequestPartitionId;
@Nullable
public RequestPartitionId getRequestPartitionId() {
return myRequestPartitionId;
}
public void setRequestPartitionId(@Nullable RequestPartitionId theRequestPartitionId) {
myRequestPartitionId = theRequestPartitionId;
}
public List<String> getUrl() {
if (myUrl == null) {
myUrl = new ArrayList<>();

View File

@ -26,6 +26,7 @@ import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.api.dao.IFhirSystemDao;
@ -47,9 +48,8 @@ import org.springframework.transaction.support.TransactionCallback;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class ReindexStep implements IJobStepWorker<ReindexJobParameters, ReindexChunkIds, VoidModel> {
public class ReindexStep implements IJobStepWorker<ReindexJobParameters, ResourceIdListWorkChunkJson, VoidModel> {
private static final Logger ourLog = LoggerFactory.getLogger(ReindexStep.class);
@Autowired
@ -63,31 +63,31 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Reindex
@Nonnull
@Override
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, ReindexChunkIds> theStepExecutionDetails, @Nonnull IJobDataSink<VoidModel> theDataSink) throws JobExecutionFailedException {
public RunOutcome run(@Nonnull StepExecutionDetails<ReindexJobParameters, ResourceIdListWorkChunkJson> theStepExecutionDetails, @Nonnull IJobDataSink<VoidModel> theDataSink) throws JobExecutionFailedException {
ReindexChunkIds data = theStepExecutionDetails.getData();
ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData();
return doReindex(data, theDataSink, theStepExecutionDetails.getInstanceId(), theStepExecutionDetails.getChunkId());
}
@Nonnull
public RunOutcome doReindex(ReindexChunkIds data, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId) {
public RunOutcome doReindex(ResourceIdListWorkChunkJson data, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId) {
RequestDetails requestDetails = new SystemRequestDetails();
TransactionDetails transactionDetails = new TransactionDetails();
myHapiTransactionService.execute(requestDetails, transactionDetails, new ReindexJob(data, requestDetails, transactionDetails, theDataSink, theInstanceId, theChunkId));
return new RunOutcome(data.getIds().size());
return new RunOutcome(data.size());
}
private class ReindexJob implements TransactionCallback<Void> {
private final ReindexChunkIds myData;
private final ResourceIdListWorkChunkJson myData;
private final RequestDetails myRequestDetails;
private final TransactionDetails myTransactionDetails;
private final IJobDataSink<VoidModel> myDataSink;
private final String myChunkId;
private final String myInstanceId;
public ReindexJob(ReindexChunkIds theData, RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId) {
public ReindexJob(ResourceIdListWorkChunkJson theData, RequestDetails theRequestDetails, TransactionDetails theTransactionDetails, IJobDataSink<VoidModel> theDataSink, String theInstanceId, String theChunkId) {
myData = theData;
myRequestDetails = theRequestDetails;
myTransactionDetails = theTransactionDetails;
@ -99,11 +99,7 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Reindex
@Override
public Void doInTransaction(@Nonnull TransactionStatus theStatus) {
List<ResourcePersistentId> persistentIds = myData
.getIds()
.stream()
.map(t -> new ResourcePersistentId(t.getId()))
.collect(Collectors.toList());
List<ResourcePersistentId> persistentIds = myData.getResourcePersistentIds();
ourLog.info("Starting reindex work chunk with {} resources - Instance[{}] Chunk[{}]", persistentIds.size(), myInstanceId, myChunkId);
StopWatch sw = new StopWatch();
@ -116,9 +112,9 @@ public class ReindexStep implements IJobStepWorker<ReindexJobParameters, Reindex
// Reindex
sw.restart();
for (int i = 0; i < myData.getIds().size(); i++) {
for (int i = 0; i < myData.size(); i++) {
String nextResourceType = myData.getIds().get(i).getResourceType();
String nextResourceType = myData.getResourceType(i);
IFhirResourceDao<?> dao = myDaoRegistry.getResourceDao(nextResourceType);
ResourcePersistentId resourcePersistentId = persistentIds.get(i);
try {

View File

@ -2,14 +2,17 @@ package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidList;
import ca.uhn.fhir.jpa.api.svc.IResourceReindexSvc;
import ca.uhn.fhir.rest.api.server.storage.ResourcePersistentId;
import org.hl7.fhir.r4.model.InstantType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -18,6 +21,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static ca.uhn.fhir.batch2.jobs.step.ResourceIdListStep.DEFAULT_PAGE_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
@ -38,34 +42,38 @@ public class LoadIdsStepTest {
private IResourceReindexSvc myResourceReindexSvc;
@Mock
private IJobDataSink<ReindexChunkIds> mySink;
private IJobDataSink<ResourceIdListWorkChunkJson> mySink;
@InjectMocks
private LoadIdsStep mySvc;
@BeforeEach
public void before() {
mySvc = new LoadIdsStep(myResourceReindexSvc);
}
@Captor
private ArgumentCaptor<ReindexChunkIds> myChunkIdsCaptor;
private ArgumentCaptor<ResourceIdListWorkChunkJson> myChunkIdsCaptor;
@Test
public void testGenerateSteps() {
ReindexJobParameters parameters = new ReindexJobParameters();
ReindexChunkRange range = new ReindexChunkRange()
.setStart(DATE_1)
.setEnd(DATE_END);
ReindexChunkRangeJson range = new ReindexChunkRangeJson();
range.setStart(DATE_1).setEnd(DATE_END);
String instanceId = "instance-id";
String chunkId = "chunk-id";
StepExecutionDetails<ReindexJobParameters, ReindexChunkRange> details = new StepExecutionDetails<>(parameters, range, instanceId, chunkId);
StepExecutionDetails<ReindexJobParameters, ReindexChunkRangeJson> details = new StepExecutionDetails<>(parameters, range, instanceId, chunkId);
// First Execution
when(myResourceReindexSvc.fetchResourceIdsPage(eq(DATE_1), eq(DATE_END), isNull(), isNull()))
when(myResourceReindexSvc.fetchResourceIdsPage(eq(DATE_1), eq(DATE_END), eq(DEFAULT_PAGE_SIZE), isNull(), isNull()))
.thenReturn(createIdChunk(0L, 20000L, DATE_2));
when(myResourceReindexSvc.fetchResourceIdsPage(eq(DATE_2), eq(DATE_END), isNull(), isNull()))
when(myResourceReindexSvc.fetchResourceIdsPage(eq(DATE_2), eq(DATE_END), eq(DEFAULT_PAGE_SIZE), isNull(), isNull()))
.thenReturn(createIdChunk(20000L, 40000L, DATE_3));
when(myResourceReindexSvc.fetchResourceIdsPage(eq(DATE_3), eq(DATE_END), isNull(), isNull()))
when(myResourceReindexSvc.fetchResourceIdsPage(eq(DATE_3), eq(DATE_END), eq(DEFAULT_PAGE_SIZE), isNull(), isNull()))
.thenReturn(createIdChunk(40000L, 40040L, DATE_3));
mySvc.run(details, mySink);
verify(mySink, times(41)).accept(myChunkIdsCaptor.capture());
for (int i = 0; i < 40; i++) {
assertEquals(createIdChunk(i * 1000, (i * 1000) + 1000).toString(), myChunkIdsCaptor.getAllValues().get(i).toString());
@ -75,23 +83,22 @@ public class LoadIdsStepTest {
}
@Nonnull
private ReindexChunkIds createIdChunk(int theLow, int theHigh) {
ReindexChunkIds retVal = new ReindexChunkIds();
for (int i = theLow; i < theHigh; i++) {
retVal.getIds().add(new ReindexChunkIds.Id().setResourceType("Patient").setId(Integer.toString(i)));
private ResourceIdListWorkChunkJson createIdChunk(int theLow, int theHigh) {
ResourceIdListWorkChunkJson retVal = new ResourceIdListWorkChunkJson();
for (long i = theLow; i < theHigh; i++) {
retVal.addTypedPid("Patient", i);
}
return retVal;
}
@Nonnull
private IResourceReindexSvc.IdChunk createIdChunk(long idLow, long idHigh, Date lastDate) {
private IResourcePidList createIdChunk(long idLow, long idHigh, Date lastDate) {
List<ResourcePersistentId> ids = new ArrayList<>();
List<String> resourceTypes = new ArrayList<>();
for (long i = idLow; i < idHigh; i++) {
ids.add(new ResourcePersistentId(i));
resourceTypes.add("Patient");
}
IResourceReindexSvc.IdChunk chunk = new IResourceReindexSvc.IdChunk(ids, resourceTypes, lastDate);
IResourcePidList chunk = new HomogeneousResourcePidList("Patient", ids, lastDate);
return chunk;
}

View File

@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>6.1.0-PRE3-SNAPSHOT</version>
<version>6.1.0-PRE4-SNAPSHOT</version>
<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>

View File

@ -0,0 +1,13 @@
package ca.uhn.fhir.batch2.config;
import org.hl7.fhir.r4.model.InstantType;
import java.util.Date;
public class Batch2Constants {
/**
* The batch 2 system assumes that all records have a start date later than this date. This date is used as a starting
* date when performing operations that pull resources by time windows.
*/
public static final Date BATCH_START_DATE = new InstantType("2000-01-01T00:00:00Z").getValue();
}

View File

@ -46,20 +46,25 @@ public class JobDefinitionRegistry {
public <PT extends IModelJson> void addJobDefinition(@Nonnull JobDefinition<PT> theDefinition) {
Validate.notNull(theDefinition);
Validate.notBlank(theDefinition.getJobDefinitionId());
String jobDefinitionId = theDefinition.getJobDefinitionId();
Validate.notBlank(jobDefinitionId);
Validate.isTrue(theDefinition.getJobDefinitionVersion() >= 1);
Validate.isTrue(theDefinition.getSteps().size() > 1);
Set<String> stepIds = new HashSet<>();
for (JobDefinitionStep<?, ?, ?> next : theDefinition.getSteps()) {
if (!stepIds.add(next.getStepId())) {
throw new ConfigurationException(Msg.code(2046) + "Duplicate step[" + next.getStepId() + "] in definition[" + theDefinition.getJobDefinitionId() + "] version: " + theDefinition.getJobDefinitionVersion());
throw new ConfigurationException(Msg.code(2046) + "Duplicate step[" + next.getStepId() + "] in definition[" + jobDefinitionId + "] version: " + theDefinition.getJobDefinitionVersion());
}
}
TreeMap<Integer, JobDefinition<?>> versionMap = myJobs.computeIfAbsent(theDefinition.getJobDefinitionId(), t -> new TreeMap<>());
TreeMap<Integer, JobDefinition<?>> versionMap = myJobs.computeIfAbsent(jobDefinitionId, t -> new TreeMap<>());
if (versionMap.containsKey(theDefinition.getJobDefinitionVersion())) {
throw new ConfigurationException(Msg.code(2047) + "Multiple definitions for job[" + theDefinition.getJobDefinitionId() + "] version: " + theDefinition.getJobDefinitionVersion());
if (versionMap.get(theDefinition.getJobDefinitionVersion()) == theDefinition) {
ourLog.warn("job[{}] version: {} already registered. Not registering again.", jobDefinitionId, theDefinition.getJobDefinitionVersion());
return;
}
throw new ConfigurationException(Msg.code(2047) + "Multiple definitions for job[" + jobDefinitionId + "] version: " + theDefinition.getJobDefinitionVersion());
}
versionMap.put(theDefinition.getJobDefinitionVersion(), theDefinition);
}

View File

@ -0,0 +1,45 @@
package ca.uhn.fhir.batch2.jobs.chunk;
import ca.uhn.fhir.jpa.util.JsonDateDeserializer;
import ca.uhn.fhir.jpa.util.JsonDateSerializer;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.annotation.Nonnull;
import java.util.Date;
public class ChunkRangeJson implements IModelJson {
@JsonSerialize(using = JsonDateSerializer.class)
@JsonDeserialize(using = JsonDateDeserializer.class)
@JsonProperty("start")
@Nonnull
private Date myStart;
@JsonSerialize(using = JsonDateSerializer.class)
@JsonDeserialize(using = JsonDateDeserializer.class)
@JsonProperty("end")
@Nonnull
private Date myEnd;
@Nonnull
public Date getStart() {
return myStart;
}
public ChunkRangeJson setStart(@Nonnull Date theStart) {
myStart = theStart;
return this;
}
@Nonnull
public Date getEnd() {
return myEnd;
}
public ChunkRangeJson setEnd(@Nonnull Date theEnd) {
myEnd = theEnd;
return this;
}
}

Some files were not shown because too many files have changed in this diff Show More