Fix $reindex job with custom partition interceptor based on resource type. Update reindex job to always run with urls. (#6185)

* Refactor logic to bring together partition related logic in batch2 jobs using IJobPartitionProvider. Update logic such that reindex job without urls will attempt to create urls for all supported resource types.

* Small changes and fix of pipeline error.

* Small change to enable mdm-submit to use PartitionedUrl in the job parameters

* Revert logback change. Fix dependency version generating errors in the pipeline.

* Spotless fix. Add test dependency back without version.

* Upgrade test dependency to another version

* Add javadoc for PartitionedUrl. Other small fixes and refactoring in tests.

* Spotless fix.

* Change to JobParameters to fix some of the tests.

* Small changes for code review in test

* Address code review comments.

* Revert change from bad merge.

* Address remaining code review comments
This commit is contained in:
Martha Mitran 2024-08-07 09:51:49 -07:00 committed by GitHub
parent d5fbdaf2e4
commit 7a5bdee7ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
53 changed files with 948 additions and 816 deletions

View File

@ -0,0 +1,7 @@
---
type: change
issue: 6179
title: "The $reindex operation could potentially initiate a reindex job without any urls provided in the parameters.
We now internally generate a list of urls out of all the supported resource types and attempt to reindex
found resources of each type separately. As a result, each reindex (batch2) job chunk will be always associated with a url."

View File

@ -0,0 +1,6 @@
---
type: fix
issue: 6179
title: "Previously, the $reindex operation would fail when using a custom partitioning interceptor which decides the partition
based on the resource type in the request. This has been fixed, such that we avoid retrieving the resource type from
the request, rather we use the urls provided as parameters to the operation to determine the partitions."

View File

@ -17,7 +17,7 @@
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.reindex;
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
@ -41,8 +41,10 @@ import ca.uhn.fhir.rest.api.SortSpec;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.DateRangeUtil;
import ca.uhn.fhir.util.Logs;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import java.util.Date;
@ -50,7 +52,7 @@ import java.util.function.Supplier;
import java.util.stream.Stream;
public class Batch2DaoSvcImpl implements IBatch2DaoSvc {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(Batch2DaoSvcImpl.class);
private static final org.slf4j.Logger ourLog = Logs.getBatchTroubleshootingLog();
private final IResourceTableDao myResourceTableDao;
@ -83,7 +85,7 @@ public class Batch2DaoSvcImpl implements IBatch2DaoSvc {
@Override
public IResourcePidStream fetchResourceIdStream(
Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId, String theUrl) {
if (theUrl == null) {
if (StringUtils.isBlank(theUrl)) {
return makeStreamResult(
theRequestPartitionId, () -> streamResourceIdsNoUrl(theStart, theEnd, theRequestPartitionId));
} else {
@ -127,6 +129,10 @@ public class Batch2DaoSvcImpl implements IBatch2DaoSvc {
return new TypedResourceStream(theRequestPartitionId, streamTemplate);
}
/**
* At the moment there is no use-case for this method.
* This can be cleaned up at a later point in time if there is no use for it.
*/
@Nonnull
private Stream<TypedResourcePid> streamResourceIdsNoUrl(
Date theStart, Date theEnd, RequestPartitionId theRequestPartitionId) {

View File

@ -19,7 +19,6 @@
*/
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
@ -28,8 +27,6 @@ import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import jakarta.persistence.EntityManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -55,10 +52,4 @@ public class JpaBatch2Config extends BaseBatch2Config {
theEntityManager,
theInterceptorBroadcaster);
}
@Bean
public IJobPartitionProvider jobPartitionProvider(
IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) {
return new JpaJobPartitionProvider(theRequestPartitionHelperSvc, thePartitionLookupSvc);
}
}

View File

@ -19,45 +19,47 @@
*/
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.coordinator.DefaultJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import java.util.List;
import java.util.stream.Collectors;
/**
* The default JPA implementation, which uses {@link IRequestPartitionHelperSvc} and {@link IPartitionLookupSvc}
* to compute the partition to run a batch2 job.
* to compute the {@link PartitionedUrl} list to run a batch2 job.
* The latter will be used to handle cases when the job is configured to run against all partitions
* (bulk system operation) and will return the actual list with all the configured partitions.
*/
public class JpaJobPartitionProvider implements IJobPartitionProvider {
protected final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Deprecated
public class JpaJobPartitionProvider extends DefaultJobPartitionProvider {
private final IPartitionLookupSvc myPartitionLookupSvc;
public JpaJobPartitionProvider(
IRequestPartitionHelperSvc theRequestPartitionHelperSvc, IPartitionLookupSvc thePartitionLookupSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
super(theRequestPartitionHelperSvc);
myPartitionLookupSvc = thePartitionLookupSvc;
}
public JpaJobPartitionProvider(
FhirContext theFhirContext,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
MatchUrlService theMatchUrlService,
IPartitionLookupSvc thePartitionLookupSvc) {
super(theFhirContext, theRequestPartitionHelperSvc, theMatchUrlService);
myPartitionLookupSvc = thePartitionLookupSvc;
}
@Override
public List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation) {
RequestPartitionId partitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, theOperation);
if (!partitionId.isAllPartitions()) {
return List.of(partitionId);
}
// handle (bulk) system operations that are typically configured with RequestPartitionId.allPartitions()
// populate the actual list of all partitions
List<RequestPartitionId> partitionIdList = myPartitionLookupSvc.listPartitions().stream()
public List<RequestPartitionId> getAllPartitions() {
return myPartitionLookupSvc.listPartitions().stream()
.map(PartitionEntity::toRequestPartitionId)
.collect(Collectors.toList());
partitionIdList.add(RequestPartitionId.defaultPartition());
return partitionIdList;
}
}

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.api.svc.IDeleteExpungeSvc;
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.batch2.Batch2DaoSvcImpl;
import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc;
import ca.uhn.fhir.jpa.dao.data.IResourceLinkDao;
import ca.uhn.fhir.jpa.dao.data.IResourceTableDao;
@ -32,7 +33,6 @@ import ca.uhn.fhir.jpa.dao.expunge.ResourceTableFKProvider;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.delete.batch2.DeleteExpungeSqlBuilder;
import ca.uhn.fhir.jpa.delete.batch2.DeleteExpungeSvcImpl;
import ca.uhn.fhir.jpa.reindex.Batch2DaoSvcImpl;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import jakarta.persistence.EntityManager;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -20,7 +20,7 @@
package ca.uhn.fhir.jpa.dao;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
@ -103,7 +103,6 @@ import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.rest.server.servlet.ServletRequestDetails;
import ca.uhn.fhir.rest.server.util.CompositeInterceptorBroadcaster;
import ca.uhn.fhir.util.ReflectionUtil;
@ -193,6 +192,9 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
@Autowired
private IRequestPartitionHelperSvc myRequestPartitionHelperService;
@Autowired
private IJobPartitionProvider myJobPartitionProvider;
@Autowired
private MatchUrlService myMatchUrlService;
@ -214,9 +216,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
private TransactionTemplate myTxTemplate;
@Autowired
private UrlPartitioner myUrlPartitioner;
@Autowired
private ResourceSearchUrlSvc myResourceSearchUrlSvc;
@ -1306,14 +1305,12 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
ReindexJobParameters params = new ReindexJobParameters();
List<String> urls = List.of();
if (!isCommonSearchParam(theBase)) {
addAllResourcesTypesToReindex(theBase, theRequestDetails, params);
urls = theBase.stream().map(t -> t + "?").collect(Collectors.toList());
}
RequestPartitionId requestPartition =
myRequestPartitionHelperService.determineReadPartitionForRequestForServerOperation(
theRequestDetails, ProviderConstants.OPERATION_REINDEX);
params.setRequestPartitionId(requestPartition);
myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl);
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
@ -1334,14 +1331,6 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
return Boolean.parseBoolean(shouldSkip.toString());
}
private void addAllResourcesTypesToReindex(
List<String> theBase, RequestDetails theRequestDetails, ReindexJobParameters params) {
theBase.stream()
.map(t -> t + "?")
.map(url -> myUrlPartitioner.partitionUrl(url, theRequestDetails))
.forEach(params::addPartitionedUrl);
}
private boolean isCommonSearchParam(List<String> theBase) {
// If the base contains the special resource "Resource", this is a common SP that applies to all resources
return theBase.stream().map(String::toLowerCase).anyMatch(BASE_RESOURCE_NAME::equals);

View File

@ -1,76 +0,0 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.partition.IPartitionLookupSvc;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class JpaJobPartitionProviderTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private IPartitionLookupSvc myPartitionLookupSvc;
@InjectMocks
private JpaJobPartitionProvider myJobPartitionProvider;
@Test
public void getPartitions_requestSpecificPartition_returnsPartition() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation))).thenReturn(partitionId);
// test
List <RequestPartitionId> partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
Assertions.assertThat(partitionIds).hasSize(1);
Assertions.assertThat(partitionIds).containsExactlyInAnyOrder(partitionId);
}
@Test
public void getPartitions_requestAllPartitions_returnsListOfAllSpecificPartitions() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation)))
.thenReturn( RequestPartitionId.allPartitions());
List<RequestPartitionId> partitionIds = List.of(RequestPartitionId.fromPartitionIds(1), RequestPartitionId.fromPartitionIds(2));
List<PartitionEntity> partitionEntities = new ArrayList<>();
partitionIds.forEach(partitionId -> {
PartitionEntity entity = mock(PartitionEntity.class);
when(entity.toRequestPartitionId()).thenReturn(partitionId);
partitionEntities.add(entity);
});
when(myPartitionLookupSvc.listPartitions()).thenReturn(partitionEntities);
List<RequestPartitionId> expectedPartitionIds = new ArrayList<>(partitionIds);
expectedPartitionIds.add(RequestPartitionId.defaultPartition());
// test
List<RequestPartitionId> actualPartitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
Assertions.assertThat(actualPartitionIds).hasSize(expectedPartitionIds.size());
Assertions.assertThat(actualPartitionIds).containsExactlyInAnyOrder(expectedPartitionIds.toArray(new RequestPartitionId[0]));
}
}

View File

@ -20,6 +20,7 @@
package ca.uhn.fhir.jpa.mdm.svc;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.api.HookParams;
@ -360,9 +361,9 @@ public class MdmControllerSvcImpl implements IMdmControllerSvc {
if (hasBatchSize) {
params.setBatchSize(theBatchSize.getValue().intValue());
}
params.setRequestPartitionId(RequestPartitionId.allPartitions());
theUrls.forEach(params::addUrl);
RequestPartitionId partitionId = RequestPartitionId.allPartitions();
theUrls.forEach(
url -> params.addPartitionedUrl(new PartitionedUrl().setUrl(url).setRequestPartitionId(partitionId)));
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setParameters(params);

View File

@ -95,7 +95,7 @@ public class MatchUrlService {
}
if (Constants.PARAM_LASTUPDATED.equals(nextParamName)) {
if (paramList != null && paramList.size() > 0) {
if (!paramList.isEmpty()) {
if (paramList.size() > 2) {
throw new InvalidRequestException(Msg.code(484) + "Failed to parse match URL[" + theMatchUrl
+ "] - Can not have more than 2 " + Constants.PARAM_LASTUPDATED
@ -111,9 +111,7 @@ public class MatchUrlService {
myFhirContext, RestSearchParameterTypeEnum.HAS, nextParamName, paramList);
paramMap.add(nextParamName, param);
} else if (Constants.PARAM_COUNT.equals(nextParamName)) {
if (paramList != null
&& paramList.size() > 0
&& paramList.get(0).size() > 0) {
if (!paramList.isEmpty() && !paramList.get(0).isEmpty()) {
String intString = paramList.get(0).get(0);
try {
paramMap.setCount(Integer.parseInt(intString));
@ -123,16 +121,14 @@ public class MatchUrlService {
}
}
} else if (Constants.PARAM_SEARCH_TOTAL_MODE.equals(nextParamName)) {
if (paramList != null
&& !paramList.isEmpty()
&& !paramList.get(0).isEmpty()) {
if (!paramList.isEmpty() && !paramList.get(0).isEmpty()) {
String totalModeEnumStr = paramList.get(0).get(0);
SearchTotalModeEnum searchTotalMode = SearchTotalModeEnum.fromCode(totalModeEnumStr);
if (searchTotalMode == null) {
// We had an oops here supporting the UPPER CASE enum instead of the FHIR code for _total.
// Keep supporting it in case someone is using it.
try {
paramMap.setSearchTotalMode(SearchTotalModeEnum.valueOf(totalModeEnumStr));
searchTotalMode = SearchTotalModeEnum.valueOf(totalModeEnumStr);
} catch (IllegalArgumentException e) {
throw new InvalidRequestException(Msg.code(2078) + "Invalid "
+ Constants.PARAM_SEARCH_TOTAL_MODE + " value: " + totalModeEnumStr);
@ -141,9 +137,7 @@ public class MatchUrlService {
paramMap.setSearchTotalMode(searchTotalMode);
}
} else if (Constants.PARAM_OFFSET.equals(nextParamName)) {
if (paramList != null
&& paramList.size() > 0
&& paramList.get(0).size() > 0) {
if (!paramList.isEmpty() && !paramList.get(0).isEmpty()) {
String intString = paramList.get(0).get(0);
try {
paramMap.setOffset(Integer.parseInt(intString));
@ -238,40 +232,27 @@ public class MatchUrlService {
return getResourceSearch(theUrl, null);
}
public abstract static class Flag {
/**
* Constructor
*/
Flag() {
// nothing
}
abstract void process(
String theParamName, List<QualifiedParamList> theValues, SearchParameterMap theMapToPopulate);
public interface Flag {
void process(String theParamName, List<QualifiedParamList> theValues, SearchParameterMap theMapToPopulate);
}
/**
* Indicates that the parser should process _include and _revinclude (by default these are not handled)
*/
public static Flag processIncludes() {
return new Flag() {
@Override
void process(String theParamName, List<QualifiedParamList> theValues, SearchParameterMap theMapToPopulate) {
if (Constants.PARAM_INCLUDE.equals(theParamName)) {
for (QualifiedParamList nextQualifiedList : theValues) {
for (String nextValue : nextQualifiedList) {
theMapToPopulate.addInclude(new Include(
nextValue, ParameterUtil.isIncludeIterate(nextQualifiedList.getQualifier())));
}
return (theParamName, theValues, theMapToPopulate) -> {
if (Constants.PARAM_INCLUDE.equals(theParamName)) {
for (QualifiedParamList nextQualifiedList : theValues) {
for (String nextValue : nextQualifiedList) {
theMapToPopulate.addInclude(new Include(
nextValue, ParameterUtil.isIncludeIterate(nextQualifiedList.getQualifier())));
}
} else if (Constants.PARAM_REVINCLUDE.equals(theParamName)) {
for (QualifiedParamList nextQualifiedList : theValues) {
for (String nextValue : nextQualifiedList) {
theMapToPopulate.addRevInclude(new Include(
nextValue, ParameterUtil.isIncludeIterate(nextQualifiedList.getQualifier())));
}
}
} else if (Constants.PARAM_REVINCLUDE.equals(theParamName)) {
for (QualifiedParamList nextQualifiedList : theValues) {
for (String nextValue : nextQualifiedList) {
theMapToPopulate.addRevInclude(new Include(
nextValue, ParameterUtil.isIncludeIterate(nextQualifiedList.getQualifier())));
}
}
}

View File

@ -1,7 +1,5 @@
package ca.uhn.fhir.jpa.searchparam;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import ca.uhn.fhir.i18n.Msg;
@ -22,10 +20,10 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.assertj.core.api.Assertions.within;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@ -104,19 +102,19 @@ public class MatchUrlServiceTest extends BaseJpaTest {
@Test
void testTotal_fromStandardLowerCase() {
// given
// when
// given
// when
var map = myMatchUrlService.translateMatchUrl("Patient?family=smith&_total=none", ourCtx.getResourceDefinition("Patient"));
// then
assertEquals(SearchTotalModeEnum.NONE, map.getSearchTotalMode());
// then
assertEquals(SearchTotalModeEnum.NONE, map.getSearchTotalMode());
}
@Test
void testTotal_fromUpperCase() {
// given
// when
var map = myMatchUrlService.translateMatchUrl("Patient?family=smith&_total=none", ourCtx.getResourceDefinition("Patient"));
var map = myMatchUrlService.translateMatchUrl("Patient?family=smith&_total=NONE", ourCtx.getResourceDefinition("Patient"));
// then
assertEquals(SearchTotalModeEnum.NONE, map.getSearchTotalMode());

View File

@ -0,0 +1,225 @@
package ca.uhn.fhir.jpa.batch2;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.pid.TypedResourcePid;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import jakarta.annotation.Nonnull;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.LocalDate;
import java.time.Month;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
class Batch2DaoSvcImplTest extends BaseJpaR4Test {
private static final Date PREVIOUS_MILLENNIUM = toDate(LocalDate.of(1999, Month.DECEMBER, 31));
private static final Date TOMORROW = toDate(LocalDate.now().plusDays(1));
@Autowired
private MatchUrlService myMatchUrlService;
@Autowired
private IHapiTransactionService myIHapiTransactionService ;
private IBatch2DaoSvc mySvc;
@BeforeEach
void beforeEach() {
mySvc = new Batch2DaoSvcImpl(myResourceTableDao, myMatchUrlService, myDaoRegistry, myFhirContext, myIHapiTransactionService);
}
@Test
void fetchResourceIds_ByUrlInvalidUrl() {
IResourcePidStream stream = mySvc.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, null, "Patient");
final InternalErrorException exception = assertThrows(InternalErrorException.class, () -> stream.visitStream(Stream::toList));
assertEquals("HAPI-2422: this should never happen: URL is missing a '?'", exception.getMessage());
}
@Test
void fetchResourceIds_ByUrlSingleQuestionMark() {
IResourcePidStream stream = mySvc.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, null, "?");
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> stream.visitStream(Stream::toList));
assertEquals("theResourceName must not be blank", exception.getMessage());
}
@Test
void fetchResourceIds_ByUrlNonsensicalResource() {
IResourcePidStream stream = mySvc.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, null, "Banana?_expunge=true");
final DataFormatException exception = assertThrows(DataFormatException.class, () -> stream.visitStream(Stream::toList));
assertEquals("HAPI-1684: Unknown resource name \"Banana\" (this name is not known in FHIR version \"R4\")", exception.getMessage());
}
@ParameterizedTest
@ValueSource(ints = {0, 9, 10, 11, 21, 22, 23, 45})
void fetchResourceIds_ByUrl(int expectedNumResults) {
final List<IIdType> patientIds = IntStream.range(0, expectedNumResults)
.mapToObj(num -> createPatient())
.toList();
final IResourcePidStream resourcePidList = mySvc.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), "Patient?_expunge=true");
final List<? extends IIdType> actualPatientIds =
resourcePidList.visitStream(s-> s.map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId()))
.toList());
assertIdsEqual(patientIds, actualPatientIds);
}
@Test
public void fetchResourceIds_ByUrl_WithData() {
// Setup
createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChange();
// Start of resources within range
Date start = new Date();
sleepUntilTimeChange();
Long patientId1 = createPatient(withActiveFalse()).getIdPartAsLong();
createObservation(withObservationCode("http://foo", "bar"));
createObservation(withObservationCode("http://foo", "bar"));
sleepUntilTimeChange();
Long patientId2 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChange();
Date end = new Date();
// End of resources within range
createObservation(withObservationCode("http://foo", "bar"));
createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChange();
// Execute
myCaptureQueriesListener.clear();
IResourcePidStream queryStream = mySvc.fetchResourceIdStream(start, end, null, "Patient?active=false");
// Verify
List<TypedResourcePid> typedResourcePids = queryStream.visitStream(Stream::toList);
assertThat(typedResourcePids)
.hasSize(2)
.containsExactly(
new TypedResourcePid("Patient", patientId1),
new TypedResourcePid("Patient", patientId2));
assertThat(myCaptureQueriesListener.logSelectQueries()).hasSize(1);
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
}
@ParameterizedTest
@ValueSource(ints = {0, 9, 10, 11, 21, 22, 23, 45})
void fetchResourceIds_NoUrl(int expectedNumResults) {
final List<IIdType> patientIds = IntStream.range(0, expectedNumResults)
.mapToObj(num -> createPatient())
.toList();
// at the moment there is no Prod use-case for noUrl use-case
// reindex will always have urls as well (see https://github.com/hapifhir/hapi-fhir/issues/6179)
final IResourcePidStream resourcePidList = mySvc.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), null);
final List<? extends IIdType> actualPatientIds =
resourcePidList.visitStream(s-> s.map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId()))
.toList());
assertIdsEqual(patientIds, actualPatientIds);
}
private static void assertIdsEqual(List<IIdType> expectedResourceIds, List<? extends IIdType> actualResourceIds) {
assertThat(actualResourceIds).hasSize(expectedResourceIds.size());
for (int index = 0; index < expectedResourceIds.size(); index++) {
final IIdType expectedIdType = expectedResourceIds.get(index);
final IIdType actualIdType = actualResourceIds.get(index);
assertEquals(expectedIdType.getResourceType(), actualIdType.getResourceType());
assertEquals(expectedIdType.getIdPartAsLong(), actualIdType.getIdPartAsLong());
}
}
@Nonnull
private static Date toDate(LocalDate theLocalDate) {
return Date.from(theLocalDate.atStartOfDay(ZoneId.systemDefault()).toInstant());
}
@ParameterizedTest
@NullSource
@ValueSource(strings = {"", " "})
public void fetchResourceIds_NoUrl_WithData(String theMissingUrl) {
// Setup
createPatient(withActiveFalse());
sleepUntilTimeChange();
Date start = new Date();
Long id0 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChange();
Long id1 = createPatient(withActiveFalse()).getIdPartAsLong();
sleepUntilTimeChange();
Long id2 = createObservation(withObservationCode("http://foo", "bar")).getIdPartAsLong();
sleepUntilTimeChange();
Date end = new Date();
sleepUntilTimeChange();
createPatient(withActiveFalse());
// Execute
myCaptureQueriesListener.clear();
IResourcePidStream queryStream = mySvc.fetchResourceIdStream(start, end, null, theMissingUrl);
// Verify
List<TypedResourcePid> typedPids = queryStream.visitStream(Stream::toList);
assertThat(typedPids)
.hasSize(3)
.containsExactly(
new TypedResourcePid("Patient", id0),
new TypedResourcePid("Patient", id1),
new TypedResourcePid("Observation", id2));
assertThat(myCaptureQueriesListener.logSelectQueries()).hasSize(1);
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
}
@ParameterizedTest
@NullSource
@ValueSource(strings = {"", " "})
public void fetchResourceIds_NoUrl_NoData(String theMissingUrl) {
// Execute
myCaptureQueriesListener.clear();
IResourcePidStream queryStream = mySvc.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, null, theMissingUrl);
// Verify
List<TypedResourcePid> typedPids = queryStream.visitStream(Stream::toList);
assertThat(typedPids).isEmpty();
assertThat(myCaptureQueriesListener.logSelectQueries()).hasSize(1);
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
assertEquals(1, myCaptureQueriesListener.getCommitCount());
assertEquals(0, myCaptureQueriesListener.getRollbackCount());
}
}

View File

@ -14,7 +14,6 @@ import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
import ca.uhn.fhir.jpa.subscription.channel.impl.LinkedBlockingChannel;
@ -53,7 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* {@link ca.uhn.fhir.batch2.maintenance.JobInstanceProcessor#cleanupInstance()}
* For chunks:
* {@link ca.uhn.fhir.jpa.batch2.JpaJobPersistenceImpl#onWorkChunkCreate}
* {@link JpaJobPersistenceImpl#onWorkChunkCreate}
* {@link JpaJobPersistenceImpl#onWorkChunkDequeue(String)}
* Chunk execution {@link ca.uhn.fhir.batch2.coordinator.StepExecutor#executeStep}
*/

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.batch2;
import static org.junit.jupiter.api.Assertions.assertFalse;
import ca.uhn.fhir.batch2.api.IJobMaintenanceService;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.JobOperationResultJson;
@ -68,6 +67,7 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

View File

@ -1,11 +1,8 @@
package ca.uhn.fhir.jpa.dao;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
@ -68,9 +65,10 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.isNotNull;
@ -87,6 +85,9 @@ class BaseHapiFhirResourceDaoTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private IJobPartitionProvider myJobPartitionProvider;
@Mock
private IIdHelperService<JpaPid> myIdHelperService;
@ -102,9 +103,6 @@ class BaseHapiFhirResourceDaoTest {
@Mock
private IJpaStorageResourceParser myJpaStorageResourceParser;
@Mock
private UrlPartitioner myUrlPartitioner;
@Mock
private ApplicationContext myApplicationContext;
@ -267,12 +265,12 @@ class BaseHapiFhirResourceDaoTest {
public void requestReindexForRelatedResources_withValidBase_includesUrlsInJobParameters() {
when(myStorageSettings.isMarkResourcesForReindexingUponSearchParameterChange()).thenReturn(true);
List<String> base = Lists.newArrayList("Patient", "Group");
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
List<String> base = Lists.newArrayList("Patient", "Group", "Practitioner");
when(myUrlPartitioner.partitionUrl(any(), any())).thenAnswer(i -> {
PartitionedUrl partitionedUrl = new PartitionedUrl();
partitionedUrl.setUrl(i.getArgument(0));
return partitionedUrl;
when(myJobPartitionProvider.getPartitionedUrls(any(), any())).thenAnswer(i -> {
List<String> urls = i.getArgument(1);
return urls.stream().map(url -> new PartitionedUrl().setUrl(url).setRequestPartitionId(partitionId)).collect(Collectors.toList());
});
mySvc.requestReindexForRelatedResources(false, base, new ServletRequestDetails());
@ -285,9 +283,12 @@ class BaseHapiFhirResourceDaoTest {
assertNotNull(actualRequest.getParameters());
ReindexJobParameters actualParameters = actualRequest.getParameters(ReindexJobParameters.class);
assertThat(actualParameters.getPartitionedUrls()).hasSize(2);
assertEquals("Patient?", actualParameters.getPartitionedUrls().get(0).getUrl());
assertEquals("Group?", actualParameters.getPartitionedUrls().get(1).getUrl());
assertThat(actualParameters.getPartitionedUrls()).hasSize(base.size());
for (int i = 0; i < base.size(); i++) {
PartitionedUrl partitionedUrl = actualParameters.getPartitionedUrls().get(i);
assertEquals(base.get(i) + "?", partitionedUrl.getUrl());
assertEquals(partitionId, partitionedUrl.getRequestPartitionId());
}
}
@Test

View File

@ -18,7 +18,7 @@ import ca.uhn.fhir.jpa.api.model.DeleteMethodOutcome;
import ca.uhn.fhir.jpa.api.model.ExpungeOptions;
import ca.uhn.fhir.jpa.api.model.HistoryCountModeEnum;
import ca.uhn.fhir.jpa.dao.data.ISearchParamPresentDao;
import ca.uhn.fhir.jpa.delete.job.ReindexTestHelper;
import ca.uhn.fhir.jpa.reindex.ReindexTestHelper;
import ca.uhn.fhir.jpa.entity.TermValueSet;
import ca.uhn.fhir.jpa.entity.TermValueSetPreExpansionStatusEnum;
import ca.uhn.fhir.jpa.interceptor.ForceOffsetSearchModeInterceptor;

View File

@ -47,8 +47,7 @@ public class PartitioningNonNullDefaultPartitionR4Test extends BasePartitioningR
addCreateDefaultPartition();
// we need two read partition accesses for when the creation of the SP triggers a reindex of Patient
addReadDefaultPartition(); // one for search param validation
addReadDefaultPartition(); // one to rewrite the resource url
addReadDefaultPartition(); // and one for the job request itself
addReadDefaultPartition(); // and one for the reindex job
SearchParameter sp = new SearchParameter();
sp.addBase("Patient");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
@ -80,8 +79,7 @@ public class PartitioningNonNullDefaultPartitionR4Test extends BasePartitioningR
addCreateDefaultPartition();
// we need two read partition accesses for when the creation of the SP triggers a reindex of Patient
addReadDefaultPartition(); // one for search param validation
addReadDefaultPartition(); // one to rewrite the resource url
addReadDefaultPartition(); // and one for the job request itself
addReadDefaultPartition(); // and one for the reindex job
SearchParameter sp = new SearchParameter();
sp.setId("SearchParameter/A");
sp.addBase("Patient");

View File

@ -3,6 +3,7 @@ package ca.uhn.fhir.jpa.dao.r4;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeAppCtx;
import ca.uhn.fhir.batch2.jobs.expunge.DeleteExpungeJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
@ -715,8 +716,10 @@ public class PartitioningSqlR4Test extends BasePartitioningR4Test {
assertEquals(1, myObservationDao.search(SearchParameterMap.newSynchronous(), mySrd).size());
DeleteExpungeJobParameters jobParameters = new DeleteExpungeJobParameters();
jobParameters.addUrl("Patient?_id=" + p1.getIdPart() + "," + p2.getIdPart());
jobParameters.setRequestPartitionId(RequestPartitionId.fromPartitionId(myPartitionId));
PartitionedUrl partitionedUrl = new PartitionedUrl()
.setUrl("Patient?_id=" + p1.getIdPart() + "," + p2.getIdPart())
.setRequestPartitionId(RequestPartitionId.fromPartitionId(myPartitionId));
jobParameters.addPartitionedUrl(partitionedUrl);
jobParameters.setCascade(true);
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();

View File

@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexStep;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
@ -45,7 +46,7 @@ public class ReindexStepTest {
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(expectedPartitionId);
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(List.of(), partitionId);
ReindexJobParameters reindexJobParameters = new ReindexJobParameters();
reindexJobParameters.setRequestPartitionId(partitionId);
reindexJobParameters.addPartitionedUrl(new PartitionedUrl().setRequestPartitionId(partitionId));
when(myHapiTransactionService.withRequest(any())).thenCallRealMethod();
when(myHapiTransactionService.buildExecutionBuilder(any())).thenCallRealMethod();

View File

@ -0,0 +1,88 @@
package ca.uhn.fhir.jpa.interceptor;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.ReadPartitionIdRequestDetails;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.provider.BaseResourceProviderR4Test;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import jakarta.annotation.Nonnull;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.StringType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import static org.assertj.core.api.Assertions.assertThat;
public class ResourceTypePartitionInterceptorR4Test extends BaseResourceProviderR4Test {
private final MyPartitionSelectorInterceptor myPartitionInterceptor = new MyPartitionSelectorInterceptor();
@Override
@BeforeEach
public void before() {
myPartitionSettings.setPartitioningEnabled(true);
myPartitionSettings.setAllowReferencesAcrossPartitions(PartitionSettings.CrossPartitionReferenceMode.ALLOWED_UNQUALIFIED);
myInterceptorRegistry.registerInterceptor(myPartitionInterceptor);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName("PART-1"), null);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName("PART-2"), null);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(3).setName("PART-3"), null);
}
@AfterEach
public void after() {
myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled());
myPartitionSettings.setAllowReferencesAcrossPartitions(new PartitionSettings().getAllowReferencesAcrossPartitions());
myInterceptorRegistry.unregisterInterceptor(myPartitionInterceptor);
}
@ParameterizedTest
@CsvSource(value = {"Patient?, 1", "Observation?, 1", ",3"})
public void reindex_withUrl_completesSuccessfully(String theUrl, int theExpectedIndexedResourceCount) {
IIdType patientId = createPatient(withGiven("John"));
createObservation(withSubject(patientId));
createEncounter();
Parameters input = new Parameters();
input.setParameter(ProviderConstants.OPERATION_REINDEX_PARAM_URL, theUrl);
Parameters response = myClient
.operation()
.onServer()
.named(ProviderConstants.OPERATION_REINDEX)
.withParameters(input)
.execute();
String jobId = ((StringType)response.getParameterValue(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID)).getValue();
myBatch2JobHelper.awaitJobHasStatus(jobId, StatusEnum.COMPLETED);
assertThat(myBatch2JobHelper.getCombinedRecordsProcessed(jobId)).isEqualTo(theExpectedIndexedResourceCount);
}
public class MyPartitionSelectorInterceptor {
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_CREATE)
public RequestPartitionId selectPartitionCreate(IBaseResource theResource) {
return selectPartition(myFhirContext.getResourceType(theResource));
}
@Hook(Pointcut.STORAGE_PARTITION_IDENTIFY_READ)
public RequestPartitionId selectPartitionRead(ReadPartitionIdRequestDetails theDetails) {
return selectPartition(theDetails.getResourceType());
}
@Nonnull
private static RequestPartitionId selectPartition(String resourceType) {
return switch (resourceType) {
case "Patient" -> RequestPartitionId.fromPartitionId(1);
case "Observation" -> RequestPartitionId.fromPartitionId(2);
default -> RequestPartitionId.fromPartitionId(3);
};
}
}
}

View File

@ -7,8 +7,9 @@ import ca.uhn.fhir.interceptor.api.IPointcut;
import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.delete.job.ReindexTestHelper;
import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamToken;
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.reindex.ReindexTestHelper;
import ca.uhn.fhir.rest.api.CacheControlDirective;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
@ -74,10 +75,10 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
public void testDeleteExpungeOperation() {
// Create patients
IIdType idAT = createPatient(withTenant(TENANT_A), withActiveTrue());
IIdType idAF = createPatient(withTenant(TENANT_A), withActiveFalse());
IIdType idBT = createPatient(withTenant(TENANT_B), withActiveTrue());
IIdType idBF = createPatient(withTenant(TENANT_B), withActiveFalse());
createPatient(withTenant(TENANT_A), withActiveTrue());
createPatient(withTenant(TENANT_A), withActiveFalse());
createPatient(withTenant(TENANT_B), withActiveTrue());
createPatient(withTenant(TENANT_B), withActiveFalse());
// validate setup
assertEquals(2, getAllPatientsInTenant(TENANT_A).getTotal());
@ -103,7 +104,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
String jobId = BatchHelperR4.jobIdFromBatch2Parameters(response);
myBatch2JobHelper.awaitJobCompletion(jobId);
assertThat(interceptor.requestPartitionIds).hasSize(4);
assertThat(interceptor.requestPartitionIds).hasSize(3);
RequestPartitionId partitionId = interceptor.requestPartitionIds.get(0);
assertEquals(TENANT_B_ID, partitionId.getFirstPartitionIdOrNull());
assertEquals(TENANT_B, partitionId.getFirstPartitionNameOrNull());
@ -127,20 +128,20 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
IIdType obsFinalA = doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension());
myTenantClientInterceptor.setTenantId(TENANT_B);
IIdType obsFinalB = doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension());
doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension());
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
IIdType obsFinalD = doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension());
doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension());
reindexTestHelper.createAlleleSearchParameter();
// The searchparam value is on the observation, but it hasn't been indexed yet
myTenantClientInterceptor.setTenantId(TENANT_A);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(0);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).isEmpty();
myTenantClientInterceptor.setTenantId(TENANT_B);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(0);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).isEmpty();
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(0);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).isEmpty();
// setup
Parameters input = new Parameters();
@ -163,13 +164,13 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
// validate
runInTransaction(()->{
runInTransaction(() -> {
long indexedSps = myResourceIndexedSearchParamTokenDao
.findAll()
.stream()
.filter(t->t.getParamName().equals("alleleName"))
.count();
assertEquals(1, indexedSps, ()->"Token indexes:\n * " + myResourceIndexedSearchParamTokenDao.findAll().stream().filter(t->t.getParamName().equals("alleleName")).map(ResourceIndexedSearchParamToken::toString).collect(Collectors.joining("\n * ")));
assertEquals(1, indexedSps, () -> "Token indexes:\n * " + myResourceIndexedSearchParamTokenDao.findAll().stream().filter(t->t.getParamName().equals("alleleName")).map(ResourceIndexedSearchParamToken::toString).collect(Collectors.joining("\n * ")));
});
List<String> alleleObservationIds = reindexTestHelper.getAlleleObservationIds(myClient);
@ -178,9 +179,9 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(1);
assertEquals(obsFinalA.getIdPart(), alleleObservationIds.get(0));
myTenantClientInterceptor.setTenantId(TENANT_B);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(0);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).isEmpty();
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(0);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).isEmpty();
// Reindex default partition
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
@ -198,13 +199,13 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ourLog.info("Search params: {}", mySearchParamRegistry.getActiveSearchParams("Observation").getSearchParamNames());
logAllTokenIndexes();
runInTransaction(()->{
runInTransaction(() -> {
long indexedSps = myResourceIndexedSearchParamTokenDao
.findAll()
.stream()
.filter(t->t.getParamName().equals("alleleName"))
.filter(t -> t.getParamName().equals("alleleName"))
.count();
assertEquals(3, indexedSps, ()->"Resources:\n * " + myResourceTableDao.findAll().stream().map(t->t.toString()).collect(Collectors.joining("\n * ")));
assertEquals(2, indexedSps, () -> "Resources:\n * " + myResourceTableDao.findAll().stream().map(ResourceTable::toString).collect(Collectors.joining("\n * ")));
});
myTenantClientInterceptor.setTenantId(DEFAULT_PARTITION_NAME);
@ -216,20 +217,20 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
ReindexTestHelper reindexTestHelper = new ReindexTestHelper(myFhirContext, myDaoRegistry, mySearchParamRegistry);
myTenantClientInterceptor.setTenantId(TENANT_A);
IIdType obsFinalA = doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension(Observation.ObservationStatus.FINAL));
IIdType obsCancelledA = doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension(Observation.ObservationStatus.CANCELLED));
doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension(Observation.ObservationStatus.CANCELLED));
myTenantClientInterceptor.setTenantId(TENANT_B);
IIdType obsFinalB = doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension(Observation.ObservationStatus.FINAL));
IIdType obsCancelledB = doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension(Observation.ObservationStatus.CANCELLED));
doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension(Observation.ObservationStatus.FINAL));
doCreateResource(reindexTestHelper.buildObservationWithAlleleExtension(Observation.ObservationStatus.CANCELLED));
reindexTestHelper.createAlleleSearchParameter();
ourLog.info("Search params: {}", mySearchParamRegistry.getActiveSearchParams("Observation").getSearchParamNames());
// The searchparam value is on the observation, but it hasn't been indexed yet
myTenantClientInterceptor.setTenantId(TENANT_A);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(0);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).isEmpty();
myTenantClientInterceptor.setTenantId(TENANT_B);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(0);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).isEmpty();
// setup
Parameters input = new Parameters();
@ -259,7 +260,7 @@ public class MultitenantBatchOperationR4Test extends BaseMultitenantResourceProv
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(1);
assertEquals(obsFinalA.getIdPart(), alleleObservationIds.get(0));
myTenantClientInterceptor.setTenantId(TENANT_B);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).hasSize(0);
assertThat(reindexTestHelper.getAlleleObservationIds(myClient)).isEmpty();
}
private Bundle getAllPatientsInTenant(String theTenantId) {

View File

@ -1,127 +0,0 @@
package ca.uhn.fhir.jpa.reindex;
import static org.junit.jupiter.api.Assertions.assertEquals;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.model.primitive.IdDt;
import ca.uhn.fhir.parser.DataFormatException;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import jakarta.annotation.Nonnull;
import java.time.LocalDate;
import java.time.Month;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
class Batch2DaoSvcImplTest extends BaseJpaR4Test {
private static final Date PREVIOUS_MILLENNIUM = toDate(LocalDate.of(1999, Month.DECEMBER, 31));
private static final Date TOMORROW = toDate(LocalDate.now().plusDays(1));
private static final String URL_PATIENT_EXPUNGE_TRUE = "Patient?_expunge=true";
private static final String PATIENT = "Patient";
@Autowired
private MatchUrlService myMatchUrlService;
@Autowired
private IHapiTransactionService myIHapiTransactionService ;
private IBatch2DaoSvc mySubject;
@BeforeEach
void beforeEach() {
mySubject = new Batch2DaoSvcImpl(myResourceTableDao, myMatchUrlService, myDaoRegistry, myFhirContext, myIHapiTransactionService);
}
// TODO: LD this test won't work with the nonUrl variant yet: error: No existing transaction found for transaction marked with propagation 'mandatory'
@Test
void fetchResourcesByUrlEmptyUrl() {
final InternalErrorException exception =
assertThrows(
InternalErrorException.class,
() -> mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), "")
.visitStream(Stream::toList));
assertEquals("HAPI-2422: this should never happen: URL is missing a '?'", exception.getMessage());
}
@Test
void fetchResourcesByUrlSingleQuestionMark() {
final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), "?").visitStream(Stream::toList));
assertEquals("theResourceName must not be blank", exception.getMessage());
}
@Test
void fetchResourcesByUrlNonsensicalResource() {
final DataFormatException exception = assertThrows(DataFormatException.class, () -> mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), "Banana?_expunge=true").visitStream(Stream::toList));
assertEquals("HAPI-1684: Unknown resource name \"Banana\" (this name is not known in FHIR version \"R4\")", exception.getMessage());
}
@ParameterizedTest
@ValueSource(ints = {0, 9, 10, 11, 21, 22, 23, 45})
void fetchResourcesByUrl(int expectedNumResults) {
final List<IIdType> patientIds = IntStream.range(0, expectedNumResults)
.mapToObj(num -> createPatient())
.toList();
final IResourcePidStream resourcePidList = mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), URL_PATIENT_EXPUNGE_TRUE);
final List<? extends IIdType> actualPatientIds =
resourcePidList.visitStream(s-> s.map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId()))
.toList());
assertIdsEqual(patientIds, actualPatientIds);
}
@ParameterizedTest
@ValueSource(ints = {0, 9, 10, 11, 21, 22, 23, 45})
void fetchResourcesNoUrl(int expectedNumResults) {
final int pageSizeWellBelowThreshold = 2;
final List<IIdType> patientIds = IntStream.range(0, expectedNumResults)
.mapToObj(num -> createPatient())
.toList();
final IResourcePidStream resourcePidList = mySubject.fetchResourceIdStream(PREVIOUS_MILLENNIUM, TOMORROW, RequestPartitionId.defaultPartition(), null);
final List<? extends IIdType> actualPatientIds =
resourcePidList.visitStream(s-> s.map(typePid -> new IdDt(typePid.resourceType, (Long) typePid.id.getId()))
.toList());
assertIdsEqual(patientIds, actualPatientIds);
}
private static void assertIdsEqual(List<IIdType> expectedResourceIds, List<? extends IIdType> actualResourceIds) {
assertThat(actualResourceIds).hasSize(expectedResourceIds.size());
for (int index = 0; index < expectedResourceIds.size(); index++) {
final IIdType expectedIdType = expectedResourceIds.get(index);
final IIdType actualIdType = actualResourceIds.get(index);
assertEquals(expectedIdType.getResourceType(), actualIdType.getResourceType());
assertEquals(expectedIdType.getIdPartAsLong(), actualIdType.getIdPartAsLong());
}
}
@Nonnull
private static Date toDate(LocalDate theLocalDate) {
return Date.from(theLocalDate.atStartOfDay(ZoneId.systemDefault()).toInstant());
}
}

View File

@ -1,6 +1,5 @@
package ca.uhn.fhir.jpa.delete.job;
package ca.uhn.fhir.jpa.reindex;
import static org.junit.jupiter.api.Assertions.assertTrue;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
@ -41,17 +40,17 @@ import java.util.List;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@SuppressWarnings("SqlDialectInspection")
public class ReindexJobTest extends BaseJpaR4Test {
@Autowired
private IJobCoordinator myJobCoordinator;
@Autowired
private IJobPersistence myJobPersistence;

View File

@ -1,7 +1,8 @@
package ca.uhn.fhir.jpa.delete.job;
package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
@ -11,7 +12,6 @@ import ca.uhn.fhir.jpa.entity.PartitionEntity;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.interceptor.partition.RequestTenantPartitionInterceptor;
import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
@ -28,15 +28,13 @@ import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ReindexJobWithPartitioningTest extends BaseJpaR4Test {
@Autowired
private IJobCoordinator myJobCoordinator;
private final RequestTenantPartitionInterceptor myPartitionInterceptor = new RequestTenantPartitionInterceptor();
@BeforeEach
public void before() {
myInterceptorRegistry.registerInterceptor(myPartitionInterceptor);
myPartitionSettings.setPartitioningEnabled(true);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(1).setName("TestPartition1"), null);
myPartitionConfigSvc.createPartition(new PartitionEntity().setId(2).setName("TestPartition2"), null);
@ -61,47 +59,77 @@ public class ReindexJobWithPartitioningTest extends BaseJpaR4Test {
@AfterEach
public void after() {
myInterceptorRegistry.unregisterInterceptor(myPartitionInterceptor);
myPartitionSettings.setPartitioningEnabled(new PartitionSettings().isPartitioningEnabled());
}
public static Stream<Arguments> getReindexParameters() {
List<RequestPartitionId> twoPartitions = List.of(RequestPartitionId.fromPartitionId(1), RequestPartitionId.fromPartitionId(2));
List<RequestPartitionId> partition1 = List.of(RequestPartitionId.fromPartitionId(1));
List<RequestPartitionId> allPartitions = List.of(RequestPartitionId.allPartitions());
RequestPartitionId partition1 = RequestPartitionId.fromPartitionId(1);
RequestPartitionId partition2 = RequestPartitionId.fromPartitionId(2);
RequestPartitionId allPartitions = RequestPartitionId.allPartitions();
return Stream.of(
// includes all resources from all partitions - partition 1, partition 2 and default partition
Arguments.of(List.of(), List.of(), false, 6),
// includes all Observations
Arguments.of(List.of("Observation?"), twoPartitions, false, 3),
// includes all Observations
Arguments.of(List.of("Observation?"), allPartitions, false, 3),
Arguments.of(List.of("Observation?"), List.of(), false, 0),
// includes Observations in partition 1
Arguments.of(List.of("Observation?"), partition1, true, 2),
// includes all Patients from all partitions - partition 1, partition 2 and default partition
Arguments.of(List.of("Patient?"), allPartitions, false, 3),
// includes Patients and Observations in partitions 1 and 2
Arguments.of(List.of("Observation?", "Patient?"), twoPartitions, false, 5),
// includes Observations from partition 1 and Patients from partition 2
Arguments.of(List.of("Observation?", "Patient?"), twoPartitions, true, 3),
// includes final Observations and Patients from partitions 1 and 2
Arguments.of(List.of("Observation?status=final", "Patient?"), twoPartitions, false, 4),
// includes final Observations from partition 1 and Patients from partition 2
Arguments.of(List.of("Observation?status=final", "Patient?"), twoPartitions, true, 2),
// includes final Observations and Patients from partitions 1
Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2)
// 1. includes all resources
Arguments.of(List.of(), 6),
// 2. includes all resources from partition 1
Arguments.of(List.of(new PartitionedUrl().setRequestPartitionId(partition1)), 3),
// 3. includes all resources in all partitions
Arguments.of(List.of(new PartitionedUrl().setUrl("").setRequestPartitionId(allPartitions)), 6),
// 4. includes all Observations in partition 1 and partition 2
Arguments.of(
List.of(
new PartitionedUrl().setUrl("Observation?").setRequestPartitionId(partition1),
new PartitionedUrl().setUrl("Observation?").setRequestPartitionId(partition2)
), 3),
// 5. includes all Observations in all partitions (partition 1, partition 2 and default partition)
Arguments.of(
List.of(
new PartitionedUrl().setUrl("Observation?").setRequestPartitionId(allPartitions)),
3),
// 6. includes all Observations in partition 1
Arguments.of(
List.of(
new PartitionedUrl().setUrl("Observation?").setRequestPartitionId(partition1)),
2),
// 7. includes all Patients from all partitions
Arguments.of(
List.of(
new PartitionedUrl().setUrl("Patient?").setRequestPartitionId(allPartitions)
), 3),
// 8. includes Patients and Observations in partitions 1 and 2
Arguments.of(
List.of(
new PartitionedUrl().setUrl("Observation?").setRequestPartitionId(partition1),
new PartitionedUrl().setUrl("Patient?").setRequestPartitionId(partition2)
), 3),
// 9. includes final Observations and Patients from partitions 1 and 2
Arguments.of(
List.of(
new PartitionedUrl().setUrl("Observation?").setRequestPartitionId(partition1),
new PartitionedUrl().setUrl("Observation?").setRequestPartitionId(partition2),
new PartitionedUrl().setUrl("Patient?").setRequestPartitionId(partition1),
new PartitionedUrl().setUrl("Patient?").setRequestPartitionId(partition2)
), 5),
// 10. includes final Observations from partition 1 and Patients from partition 2
Arguments.of(
List.of(
new PartitionedUrl().setUrl("Observation?status=final").setRequestPartitionId(partition1),
new PartitionedUrl().setUrl("Observation?status=final").setRequestPartitionId(partition2),
new PartitionedUrl().setUrl("Patient?").setRequestPartitionId(partition2)
), 3),
// 11. includes final Observations and Patients from partitions 1
Arguments.of(
List.of(
new PartitionedUrl().setUrl("Observation?status=final").setRequestPartitionId(partition1),
new PartitionedUrl().setUrl("Patient?").setRequestPartitionId(partition1)
), 2)
);
}
@ParameterizedTest
@MethodSource(value = "getReindexParameters")
public void testReindex_byMultipleUrlsAndPartitions_indexesMatchingResources(List<String> theUrls,
List<RequestPartitionId> thePartitions,
boolean theShouldAssignPartitionToUrl,
int theExpectedIndexedResourceCount) {
JobParameters parameters = JobParameters.from(theUrls, thePartitions, theShouldAssignPartitionToUrl);
public void testReindex_withPartitionedUrls_indexesMatchingResources(List<PartitionedUrl> thePartitionedUrls,
int theExpectedIndexedResourceCount) {
PartitionedUrlJobParameters parameters = new PartitionedUrlJobParameters();
thePartitionedUrls.forEach(parameters::addPartitionedUrl);
// execute
JobInstanceStartRequest startRequest = new JobInstanceStartRequest();

View File

@ -1,4 +1,4 @@
package ca.uhn.fhir.jpa.delete.job;
package ca.uhn.fhir.jpa.reindex;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.DaoRegistry;

View File

@ -1,5 +1,6 @@
package ca.uhn.fhir.jpa.provider.r5;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexAppCtx;
import ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
@ -371,7 +372,7 @@ public class ResourceProviderR5Test extends BaseResourceProviderR5Test {
// do a reindex
ReindexJobParameters jobParameters = new ReindexJobParameters();
jobParameters.setRequestPartitionId(RequestPartitionId.allPartitions());
jobParameters.addPartitionedUrl(new PartitionedUrl().setRequestPartitionId(RequestPartitionId.allPartitions()));
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
request.setParameters(jobParameters);

View File

@ -82,7 +82,7 @@ public class Batch2JobHelper {
}
public JobInstance awaitJobHasStatus(String theInstanceId, StatusEnum... theExpectedStatus) {
return awaitJobHasStatus(theInstanceId, 10, theExpectedStatus);
return awaitJobHasStatus(theInstanceId, 30, theExpectedStatus);
}
public JobInstance awaitJobHasStatusWithoutMaintenancePass(String theInstanceId, StatusEnum... theExpectedStatus) {

View File

@ -19,10 +19,10 @@
*/
package ca.uhn.fhir.batch2.jobs.expunge;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
public class DeleteExpungeJobParameters extends JobParameters {
public class DeleteExpungeJobParameters extends PartitionedUrlJobParameters {
@JsonProperty("cascade")
private boolean myCascade;

View File

@ -55,9 +55,6 @@ public class DeleteExpungeJobParametersValidator implements IJobParametersValida
}
// Verify that the user has access to all requested partitions
myRequestPartitionHelperSvc.validateHasPartitionPermissions(
theRequestDetails, null, theParameters.getRequestPartitionId());
for (PartitionedUrl partitionedUrl : theParameters.getPartitionedUrls()) {
String url = partitionedUrl.getUrl();
ValidateUtil.isTrueOrThrowInvalidRequest(
@ -68,6 +65,6 @@ public class DeleteExpungeJobParametersValidator implements IJobParametersValida
theRequestDetails, null, partitionedUrl.getRequestPartitionId());
}
}
return myUrlListValidator.validatePartitionedUrls(theParameters.getPartitionedUrls());
return myUrlListValidator.validateUrls(theParameters.getUrls());
}
}

View File

@ -20,9 +20,9 @@
package ca.uhn.fhir.batch2.jobs.expunge;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.interceptor.api.HookParams;
import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
@ -31,7 +31,6 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IDeleteExpungeJobSubmitter;
import ca.uhn.fhir.rest.server.exceptions.ForbiddenOperationException;
@ -51,12 +50,6 @@ public class DeleteExpungeJobSubmitterImpl implements IDeleteExpungeJobSubmitter
@Autowired
IJobCoordinator myJobCoordinator;
@Autowired
FhirContext myFhirContext;
@Autowired
MatchUrlService myMatchUrlService;
@Autowired
IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@ -102,11 +95,16 @@ public class DeleteExpungeJobSubmitterImpl implements IDeleteExpungeJobSubmitter
.forEach(deleteExpungeJobParameters::addPartitionedUrl);
deleteExpungeJobParameters.setBatchSize(theBatchSize);
// TODO MM: apply changes similar to ReindexProvider to compute the PartitionedUrl list using
// IJobPartitionProvider.
// so that feature https://github.com/hapifhir/hapi-fhir/issues/6008 can be implemented for this operation
// Also set top level partition in case there are no urls
RequestPartitionId requestPartition =
myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, ProviderConstants.OPERATION_DELETE_EXPUNGE);
deleteExpungeJobParameters.setRequestPartitionId(requestPartition);
if (theUrlsToDeleteExpunge.isEmpty()) { // fix for https://github.com/hapifhir/hapi-fhir/issues/6179
RequestPartitionId requestPartition =
myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, ProviderConstants.OPERATION_DELETE_EXPUNGE);
deleteExpungeJobParameters.addPartitionedUrl(new PartitionedUrl().setRequestPartitionId(requestPartition));
}
deleteExpungeJobParameters.setCascade(theCascade);
deleteExpungeJobParameters.setCascadeMaxRounds(theCascadeMaxRounds);

View File

@ -26,7 +26,6 @@ import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep;
import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep;
import ca.uhn.fhir.batch2.model.JobDefinition;
@ -90,8 +89,7 @@ public class ReindexAppCtx {
public ReindexProvider reindexProvider(
FhirContext theFhirContext,
IJobCoordinator theJobCoordinator,
IJobPartitionProvider theJobPartitionHandler,
UrlPartitioner theUrlPartitioner) {
return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler, theUrlPartitioner);
IJobPartitionProvider theJobPartitionHandler) {
return new ReindexProvider(theFhirContext, theJobCoordinator, theJobPartitionHandler);
}
}

View File

@ -19,14 +19,14 @@
*/
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ReindexJobParameters extends JobParameters {
public class ReindexJobParameters extends PartitionedUrlJobParameters {
public static final String OPTIMIZE_STORAGE = "optimizeStorage";
public static final String REINDEX_SEARCH_PARAMETERS = "reindexSearchParameters";

View File

@ -20,8 +20,7 @@
package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobParametersValidator;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator;
import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
@ -31,30 +30,26 @@ import java.util.List;
public class ReindexJobParametersValidator implements IJobParametersValidator<ReindexJobParameters> {
private final UrlListValidator myUrlListValidator;
private final IUrlListValidator myUrlListValidator;
public ReindexJobParametersValidator(UrlListValidator theUrlListValidator) {
public ReindexJobParametersValidator(IUrlListValidator theUrlListValidator) {
myUrlListValidator = theUrlListValidator;
}
@Nullable
@Override
public List<String> validate(RequestDetails theRequestDetails, @Nonnull ReindexJobParameters theParameters) {
List<String> errors = myUrlListValidator.validatePartitionedUrls(theParameters.getPartitionedUrls());
List<String> errors = myUrlListValidator.validateUrls(theParameters.getUrls());
if (errors == null || errors.isEmpty()) {
// only check if there's no other errors (new list to fix immutable issues)
errors = new ArrayList<>();
List<PartitionedUrl> urls = theParameters.getPartitionedUrls();
for (PartitionedUrl purl : urls) {
String url = purl.getUrl();
for (String url : theParameters.getUrls()) {
if (url.contains(" ") || url.contains("\n") || url.contains("\t")) {
errors.add("Invalid URL. URL cannot contain spaces : " + url);
}
}
}
return errors;
}
}

View File

@ -21,7 +21,6 @@ package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
@ -41,6 +40,7 @@ import org.hl7.fhir.instance.model.api.IBaseParameters;
import org.hl7.fhir.instance.model.api.IPrimitiveType;
import java.util.List;
import java.util.stream.Collectors;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.OPTIMIZE_STORAGE;
import static ca.uhn.fhir.batch2.jobs.reindex.ReindexJobParameters.REINDEX_SEARCH_PARAMETERS;
@ -50,7 +50,6 @@ public class ReindexProvider {
private final FhirContext myFhirContext;
private final IJobCoordinator myJobCoordinator;
private final IJobPartitionProvider myJobPartitionProvider;
private final UrlPartitioner myUrlPartitioner;
/**
* Constructor
@ -58,12 +57,10 @@ public class ReindexProvider {
public ReindexProvider(
FhirContext theFhirContext,
IJobCoordinator theJobCoordinator,
IJobPartitionProvider theJobPartitionProvider,
UrlPartitioner theUrlPartitioner) {
IJobPartitionProvider theJobPartitionProvider) {
myFhirContext = theFhirContext;
myJobCoordinator = theJobCoordinator;
myJobPartitionProvider = theJobPartitionProvider;
myUrlPartitioner = theUrlPartitioner;
}
@Operation(name = ProviderConstants.OPERATION_REINDEX, idempotent = false)
@ -119,17 +116,15 @@ public class ReindexProvider {
params.setOptimisticLock(theOptimisticLock.getValue());
}
List<String> urls = List.of();
if (theUrlsToReindex != null) {
theUrlsToReindex.stream()
urls = theUrlsToReindex.stream()
.map(IPrimitiveType::getValue)
.filter(StringUtils::isNotBlank)
.map(url -> myUrlPartitioner.partitionUrl(url, theRequestDetails))
.forEach(params::addPartitionedUrl);
.collect(Collectors.toList());
}
myJobPartitionProvider
.getPartitions(theRequestDetails, ProviderConstants.OPERATION_REINDEX)
.forEach(params::addRequestPartitionId);
myJobPartitionProvider.getPartitionedUrls(theRequestDetails, urls).forEach(params::addPartitionedUrl);
JobInstanceStartRequest request = new JobInstanceStartRequest();
request.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);

View File

@ -3,13 +3,11 @@ package ca.uhn.fhir.batch2.jobs.reindex;
import ca.uhn.fhir.batch2.api.IJobCoordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner;
import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.dao.ReindexParameters;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import ca.uhn.fhir.test.utilities.server.RestfulServerExtension;
import org.hl7.fhir.r4.model.BooleanType;
@ -22,6 +20,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
@ -38,8 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNotNull;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -54,15 +53,11 @@ public class ReindexProviderTest {
private final FhirContext myCtx = FhirContext.forR4Cached();
@RegisterExtension
private final RestfulServerExtension myServerExtension = new RestfulServerExtension(myCtx);
public final RestfulServerExtension myServerExtension = new RestfulServerExtension(myCtx);
@Mock
private IJobCoordinator myJobCoordinator;
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@Mock
private UrlPartitioner myUrlPartitioner;
@Mock
private IJobPartitionProvider myJobPartitionProvider;
@ -78,7 +73,6 @@ public class ReindexProviderTest {
when(myJobCoordinator.startInstance(isNotNull(), any()))
.thenReturn(createJobStartResponse());
when(myJobPartitionProvider.getPartitions(any(), any())).thenReturn(List.of(RequestPartitionId.allPartitions()));
}
private Batch2JobStartResponse createJobStartResponse() {
@ -92,20 +86,26 @@ public class ReindexProviderTest {
myServerExtension.unregisterProvider(mySvc);
}
@Test
public void testReindex_ByUrl() {
@ParameterizedTest
@NullSource
@ValueSource(strings = {"Observation?status=active", ""})
public void testReindex_withUrlAndNonDefaultParams(String theUrl) {
// setup
Parameters input = new Parameters();
String url = "Observation?status=active";
int batchSize = 2401;
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_URL, url);
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_URL, theUrl);
input.addParameter(ProviderConstants.OPERATION_REINDEX_PARAM_BATCH_SIZE, new DecimalType(batchSize));
input.addParameter(ReindexJobParameters.REINDEX_SEARCH_PARAMETERS, new CodeType("none"));
input.addParameter(ReindexJobParameters.OPTIMISTIC_LOCK, new BooleanType(false));
input.addParameter(ReindexJobParameters.OPTIMIZE_STORAGE, new CodeType("current_version"));
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
final PartitionedUrl partitionedUrl = new PartitionedUrl().setUrl(theUrl).setRequestPartitionId(partitionId);
when(myJobPartitionProvider.getPartitionedUrls(any(), any())).thenReturn(List.of(partitionedUrl));
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
when(myUrlPartitioner.partitionUrl(anyString(), any())).thenReturn(new PartitionedUrl().setUrl(url).setRequestPartitionId(RequestPartitionId.defaultPartition()));
// Execute
Parameters response = myServerExtension
.getFhirClient()
.operation()
@ -115,54 +115,47 @@ public class ReindexProviderTest {
.execute();
// Verify
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
StringType jobId = (StringType) response.getParameterValue(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
assertEquals(TEST_JOB_ID, jobId.getValue());
verify(myJobCoordinator, times(1)).startInstance(isNotNull(), myStartRequestCaptor.capture());
ReindexJobParameters params = myStartRequestCaptor.getValue().getParameters(ReindexJobParameters.class);
assertThat(params.getPartitionedUrls().iterator().next()).isEqualTo(partitionedUrl);
// Non-default values
assertEquals(ReindexParameters.ReindexSearchParametersEnum.NONE, params.getReindexSearchParameters());
assertFalse(params.getOptimisticLock());
assertEquals(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION, params.getOptimizeStorage());
}
@Test
public void testReindex_withDefaults() {
// setup
Parameters input = new Parameters();
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
// Execute
Parameters response = myServerExtension
.getFhirClient()
.operation()
.onServer()
.named(ProviderConstants.OPERATION_REINDEX)
.withParameters(input)
.execute();
// Verify
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
StringType jobId = (StringType) response.getParameterValue(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
assertEquals(TEST_JOB_ID, jobId.getValue());
verify(myJobCoordinator, times(1)).startInstance(isNotNull(), myStartRequestCaptor.capture());
ReindexJobParameters params = myStartRequestCaptor.getValue().getParameters(ReindexJobParameters.class);
assertThat(params.getPartitionedUrls()).hasSize(1);
assertEquals(url, params.getPartitionedUrls().get(0).getUrl());
// Default values
assertEquals(ReindexParameters.ReindexSearchParametersEnum.ALL, params.getReindexSearchParameters());
assertTrue(params.getOptimisticLock());
assertEquals(ReindexParameters.OptimizeStorageModeEnum.NONE, params.getOptimizeStorage());
}
@Test
public void testReindex_NoUrl() {
// setup
Parameters input = new Parameters();
input.addParameter(ReindexJobParameters.REINDEX_SEARCH_PARAMETERS, new CodeType("none"));
input.addParameter(ReindexJobParameters.OPTIMISTIC_LOCK, new BooleanType(false));
input.addParameter(ReindexJobParameters.OPTIMIZE_STORAGE, new CodeType("current_version"));
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
// Execute
Parameters response = myServerExtension
.getFhirClient()
.operation()
.onServer()
.named(ProviderConstants.OPERATION_REINDEX)
.withParameters(input)
.execute();
// Verify
ourLog.debug(myCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(response));
StringType jobId = (StringType) response.getParameterValue(ProviderConstants.OPERATION_REINDEX_RESPONSE_JOB_ID);
assertEquals(TEST_JOB_ID, jobId.getValue());
verify(myJobCoordinator, times(1)).startInstance(isNotNull(), myStartRequestCaptor.capture());
ReindexJobParameters params = myStartRequestCaptor.getValue().getParameters(ReindexJobParameters.class);
assertThat(params.getPartitionedUrls()).isEmpty();
// Non-default values
assertEquals(ReindexParameters.ReindexSearchParametersEnum.NONE, params.getReindexSearchParameters());
assertFalse(params.getOptimisticLock());
assertEquals(ReindexParameters.OptimizeStorageModeEnum.CURRENT_VERSION, params.getOptimizeStorage());
}
}

View File

@ -0,0 +1,95 @@
package ca.uhn.hapi.fhir.batch2.test;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.ResourceSearch;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public interface IJobPartitionProviderTest {
FhirContext getFhirContext();
IRequestPartitionHelperSvc getRequestPartitionHelper();
IJobPartitionProvider getJobPartitionProvider();
MatchUrlService getMatchUrlService();
@Test
default void getPartitionedUrls_noUrls_returnsCorrectly() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
setupResourceNameUrlWithPartition(requestDetails, "Patient", RequestPartitionId.fromPartitionId(1));
setupResourceNameUrlWithPartition(requestDetails, "Observation", RequestPartitionId.fromPartitionId(2));
setupResourceNameUrlWithPartition(requestDetails, "Practitioner", null);
setupResourceNameUrlWithPartition(requestDetails, "SearchParameter", RequestPartitionId.defaultPartition());
Set<String> resourceTypes = Set.of("Patient", "Observation", "Practitioner", "SearchParameter");
when(getFhirContext().getResourceTypes()).thenReturn(resourceTypes);
// execute and verify
List<PartitionedUrl> partitionedUrls = List.of(
new PartitionedUrl().setUrl("Patient?").setRequestPartitionId(RequestPartitionId.fromPartitionId(1)),
new PartitionedUrl().setUrl("Observation?").setRequestPartitionId(RequestPartitionId.fromPartitionId(2)),
new PartitionedUrl().setUrl("Practitioner?"),
new PartitionedUrl().setUrl("SearchParameter?").setRequestPartitionId(RequestPartitionId.defaultPartition()));
executeAndVerifyGetPartitionedUrls(requestDetails, List.of(), partitionedUrls);
executeAndVerifyGetPartitionedUrls(requestDetails, null, partitionedUrls);
}
@Test
default void getPartitionedUrls_withUrls_returnsCorrectly() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
setupResourceNameUrlWithPartition(requestDetails, "Patient", RequestPartitionId.fromPartitionId(1));
setupResourceNameUrlWithPartition(requestDetails, "Observation", RequestPartitionId.allPartitions());
setupResourceNameUrlWithPartition(requestDetails, "Practitioner", null);
// execute and verify
List<String> urls = List.of("Patient?", "Observation?", "Practitioner?");
List<PartitionedUrl> partitionedUrls = List.of(
new PartitionedUrl().setUrl("Patient?").setRequestPartitionId(RequestPartitionId.fromPartitionId(1)),
new PartitionedUrl().setUrl("Observation?").setRequestPartitionId(RequestPartitionId.allPartitions()),
new PartitionedUrl().setUrl("Practitioner?"));
executeAndVerifyGetPartitionedUrls(requestDetails, urls, partitionedUrls);
}
default void executeAndVerifyGetPartitionedUrls(RequestDetails theRequestDetails, List<String> theUrls, Collection<PartitionedUrl> thePartitionedUrls) {
// test
List<PartitionedUrl> actualPartitionedUrls = getJobPartitionProvider().getPartitionedUrls(theRequestDetails, theUrls);
// verify
assertThat(actualPartitionedUrls).hasSize(thePartitionedUrls.size()).containsExactlyInAnyOrder(thePartitionedUrls.toArray(new PartitionedUrl[0]));
}
default void setupResourceNameUrlWithPartition(RequestDetails theRequestDetails, String theResourceName, RequestPartitionId thePartitionId) {
final String url = theResourceName + "?";
ResourceSearch resourceSearch = mock(ResourceSearch.class);
when(getMatchUrlService().getResourceSearch(url)).thenReturn(resourceSearch);
when(resourceSearch.getResourceName()).thenReturn(theResourceName);
SearchParameterMap searchParameterMap = mock(SearchParameterMap.class);
when(resourceSearch.getSearchParameterMap()).thenReturn(searchParameterMap);
when(getRequestPartitionHelper().determineReadPartitionForRequestForSearchType(theRequestDetails, theResourceName, searchParameterMap)).thenReturn(thePartitionId);
}
void setupPartitions(List<RequestPartitionId> thePartitionIds);
}

View File

@ -19,19 +19,18 @@
*/
package ca.uhn.fhir.batch2.api;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
import java.util.stream.Collectors;
/**
* Provides the list of partitions that a job should run against.
* TODO MM: Consider moving UrlPartitioner calls to this class once other batch operations need to support running
* across all partitions on a multitenant FHIR server.
* That way all partitioning related logic exists only here for batch jobs.
* After that PartitionedUrl#myRequestPartitionId can be marked as deprecated.
* Provides the list of {@link PartitionedUrl} that a job should run against.
*/
public interface IJobPartitionProvider {
/**
* Provides the list of partitions to run job steps against, based on the request that initiates the job.
* @param theRequestDetails the requestDetails
@ -40,5 +39,14 @@ public interface IJobPartitionProvider {
*/
List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation);
// List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation, String theUrls);
/**
* Provides the list of {@link PartitionedUrl} to run job steps against, based on the request that initiates the job
* and the urls that it's configured with.
* @param theRequestDetails the requestDetails
* @param theUrls the urls to run the job against
* @return the list of {@link PartitionedUrl}
*/
default List<PartitionedUrl> getPartitionedUrls(RequestDetails theRequestDetails, List<String> theUrls) {
return theUrls.stream().map(url -> new PartitionedUrl().setUrl(url)).collect(Collectors.toList());
}
}

View File

@ -25,17 +25,19 @@ import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IReductionStepExecutorService;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.coordinator.DefaultJobPartitionProvider;
import ca.uhn.fhir.batch2.coordinator.JobCoordinatorImpl;
import ca.uhn.fhir.batch2.coordinator.JobDefinitionRegistry;
import ca.uhn.fhir.batch2.coordinator.ReductionStepExecutorServiceImpl;
import ca.uhn.fhir.batch2.coordinator.SimpleJobPartitionProvider;
import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.maintenance.JobMaintenanceServiceImpl;
import ca.uhn.fhir.batch2.model.JobWorkNotificationJsonMessage;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelConsumerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.ChannelProducerSettings;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelFactory;
@ -144,7 +146,10 @@ public abstract class BaseBatch2Config {
}
@Bean
public IJobPartitionProvider jobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
return new SimpleJobPartitionProvider(theRequestPartitionHelperSvc);
public IJobPartitionProvider jobPartitionProvider(
FhirContext theFhirContext,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
MatchUrlService theMatchUrlService) {
return new DefaultJobPartitionProvider(theFhirContext, theRequestPartitionHelperSvc, theMatchUrlService);
}
}

View File

@ -0,0 +1,107 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.ResourceSearch;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Default implementation which provides the {@link PartitionedUrl} list for a certain operation request.
*/
public class DefaultJobPartitionProvider implements IJobPartitionProvider {
protected final IRequestPartitionHelperSvc myRequestPartitionHelper;
protected FhirContext myFhirContext;
private MatchUrlService myMatchUrlService;
public DefaultJobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
myRequestPartitionHelper = theRequestPartitionHelperSvc;
}
public DefaultJobPartitionProvider(
FhirContext theFhirContext,
IRequestPartitionHelperSvc theRequestPartitionHelperSvc,
MatchUrlService theMatchUrlService) {
myFhirContext = theFhirContext;
myRequestPartitionHelper = theRequestPartitionHelperSvc;
myMatchUrlService = theMatchUrlService;
}
public List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation) {
RequestPartitionId partitionId = myRequestPartitionHelper.determineReadPartitionForRequestForServerOperation(
theRequestDetails, theOperation);
return List.of(partitionId);
}
@Override
public List<PartitionedUrl> getPartitionedUrls(RequestDetails theRequestDetails, List<String> theUrls) {
List<String> urls = theUrls;
// if the url list is empty, use all the supported resource types to build the url list
// we can go back to no url scenario if all resource types point to the same partition
if (theUrls == null || theUrls.isEmpty()) {
urls = myFhirContext.getResourceTypes().stream()
.map(resourceType -> resourceType + "?")
.collect(Collectors.toList());
}
// determine the partition associated with each of the urls
List<PartitionedUrl> partitionedUrls = new ArrayList<>();
for (String s : urls) {
ResourceSearch resourceSearch = myMatchUrlService.getResourceSearch(s);
RequestPartitionId partitionId = myRequestPartitionHelper.determineReadPartitionForRequestForSearchType(
theRequestDetails, resourceSearch.getResourceName(), resourceSearch.getSearchParameterMap());
partitionedUrls.add(new PartitionedUrl().setUrl(s).setRequestPartitionId(partitionId));
}
// handle (bulk) system operations that are typically configured with RequestPartitionId.allPartitions()
// populate the actual list of all partitions, if that is supported
Set<RequestPartitionId> allPartitions = new LinkedHashSet<>(getAllPartitions());
List<PartitionedUrl> retVal = new ArrayList<>();
for (PartitionedUrl partitionedUrl : partitionedUrls) {
String url = partitionedUrl.getUrl();
RequestPartitionId partitionId = partitionedUrl.getRequestPartitionId();
if (partitionId != null && partitionId.isAllPartitions() && !allPartitions.isEmpty()) {
allPartitions.stream()
.map(p -> (new PartitionedUrl().setUrl(url).setRequestPartitionId(p)))
.forEach(retVal::add);
} else {
retVal.add(partitionedUrl);
}
}
return retVal;
}
public List<RequestPartitionId> getAllPartitions() {
return List.of(RequestPartitionId.allPartitions());
}
}

View File

@ -1,45 +0,0 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.batch2.api.IJobPartitionProvider;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import java.util.List;
/**
* Basic implementation which provides the partition list for a certain request which is composed of a single partition.
*/
public class SimpleJobPartitionProvider implements IJobPartitionProvider {
protected final IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
public SimpleJobPartitionProvider(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) {
myRequestPartitionHelperSvc = theRequestPartitionHelperSvc;
}
@Override
public List<RequestPartitionId> getPartitions(RequestDetails theRequestDetails, String theOperation) {
RequestPartitionId partitionId = myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(
theRequestDetails, theOperation);
return List.of(partitionId);
}
}

View File

@ -27,7 +27,4 @@ import java.util.List;
public interface IUrlListValidator {
@Nullable
List<String> validateUrls(@Nonnull List<String> theUrls);
@Nullable
List<String> validatePartitionedUrls(@Nonnull List<PartitionedUrl> thePartitionedUrls);
}

View File

@ -23,26 +23,27 @@ import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Pattern;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
/**
* Represents the pair of partition and (search) url, which can be used to configure batch2 jobs.
* It will be used to determine which FHIR resources are selected for the job.
* Please note that the url is a partial url, which means it does not include server base and tenantId,
* and it starts with the with resource type.
* e.g. Patient?, Observation?status=final
*/
public class PartitionedUrl implements IModelJson {
@Override
public String toString() {
ToStringBuilder b = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
b.append("partition", myRequestPartitionId);
b.append("myUrl", myUrl);
return b.toString();
}
@JsonProperty("url")
@Pattern(
regexp = "^[A-Z][A-Za-z0-9]+\\?.*",
message = "If populated, URL must be a search URL in the form '{resourceType}?[params]'")
String myUrl;
private String myUrl;
@JsonProperty("requestPartitionId")
RequestPartitionId myRequestPartitionId;
private RequestPartitionId myRequestPartitionId;
public String getUrl() {
return myUrl;
@ -61,4 +62,35 @@ public class PartitionedUrl implements IModelJson {
myRequestPartitionId = theRequestPartitionId;
return this;
}
@Override
public String toString() {
ToStringBuilder b = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
b.append("myUrl", myUrl);
b.append("myRequestPartitionId", myRequestPartitionId);
return b.toString();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof PartitionedUrl)) {
return false;
}
PartitionedUrl other = (PartitionedUrl) obj;
EqualsBuilder b = new EqualsBuilder();
b.append(myUrl, other.myUrl);
b.append(myRequestPartitionId, other.myRequestPartitionId);
return b.isEquals();
}
@Override
public int hashCode() {
HashCodeBuilder b = new HashCodeBuilder();
b.append(myRequestPartitionId);
b.append(myUrl);
return b.hashCode();
}
}

View File

@ -22,20 +22,22 @@ package ca.uhn.fhir.batch2.jobs.parameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.model.api.IModelJson;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Can be used to configure parameters for batch2 jobs.
* Please note that these need to be backward compatible as we do not have a way to migrate them to a different structure at the moment.
*/
public class JobParameters implements IModelJson {
public class PartitionedUrlJobParameters implements IModelJson {
@JsonProperty(value = "partitionId")
private List<RequestPartitionId> myRequestPartitionIds;
@Nullable
private RequestPartitionId myRequestPartitionId;
@JsonProperty("batchSize")
private Integer myBatchSize;
@ -44,31 +46,12 @@ public class JobParameters implements IModelJson {
private List<PartitionedUrl> myPartitionedUrls;
public void setRequestPartitionId(@Nullable RequestPartitionId theRequestPartitionId) {
if (theRequestPartitionId != null) {
myRequestPartitionIds = List.of(theRequestPartitionId);
}
myRequestPartitionId = theRequestPartitionId;
}
@Nullable
public RequestPartitionId getRequestPartitionId() {
return getFirstRequestPartitionIdOrNull();
}
@Nullable
private RequestPartitionId getFirstRequestPartitionIdOrNull() {
return myRequestPartitionIds == null || myRequestPartitionIds.isEmpty() ? null : myRequestPartitionIds.get(0);
}
@Nonnull
public List<RequestPartitionId> getRequestPartitionIds() {
if (myRequestPartitionIds == null) {
myRequestPartitionIds = new ArrayList<>();
}
return myRequestPartitionIds;
}
public void addRequestPartitionId(RequestPartitionId theRequestPartitionId) {
getRequestPartitionIds().add(theRequestPartitionId);
return myRequestPartitionId;
}
public void setBatchSize(int theBatchSize) {
@ -84,6 +67,10 @@ public class JobParameters implements IModelJson {
if (myPartitionedUrls == null) {
myPartitionedUrls = new ArrayList<>();
}
// TODO MM: added for backward compatibility, it can be removed once requestPartitionId is deprecated
myPartitionedUrls.stream()
.filter(thePartitionedUrl -> thePartitionedUrl.getRequestPartitionId() == null)
.forEach(thePartitionedUrl -> thePartitionedUrl.setRequestPartitionId(myRequestPartitionId));
return myPartitionedUrls;
}
@ -95,22 +82,10 @@ public class JobParameters implements IModelJson {
getPartitionedUrls().add(new PartitionedUrl().setUrl(theUrl));
}
@VisibleForTesting
public static JobParameters from(
List<String> theUrls, List<RequestPartitionId> thePartitions, boolean theShouldAssignPartitionToUrl) {
JobParameters parameters = new JobParameters();
if (theShouldAssignPartitionToUrl) {
assert theUrls.size() == thePartitions.size();
for (int i = 0; i < theUrls.size(); i++) {
PartitionedUrl partitionedUrl = new PartitionedUrl();
partitionedUrl.setUrl(theUrls.get(i));
partitionedUrl.setRequestPartitionId(thePartitions.get(i));
parameters.addPartitionedUrl(partitionedUrl);
}
} else {
theUrls.forEach(url -> parameters.addPartitionedUrl(new PartitionedUrl().setUrl(url)));
thePartitions.forEach(parameters::addRequestPartitionId);
}
return parameters;
public List<String> getUrls() {
return getPartitionedUrls().stream()
.map(PartitionedUrl::getUrl)
.filter(url -> !StringUtils.isBlank(url))
.collect(Collectors.toList());
}
}

View File

@ -25,7 +25,6 @@ import jakarta.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class UrlListValidator implements IUrlListValidator {
private final String myOperationName;
@ -39,20 +38,10 @@ public class UrlListValidator implements IUrlListValidator {
@Nullable
@Override
public List<String> validateUrls(@Nonnull List<String> theUrls) {
if (theUrls.isEmpty()) {
if (!myBatch2DaoSvc.isAllResourceTypeSupported()) {
return Collections.singletonList("At least one type-specific search URL must be provided for "
+ myOperationName + " on this server");
}
if (theUrls.isEmpty() && !myBatch2DaoSvc.isAllResourceTypeSupported()) {
return Collections.singletonList("At least one type-specific search URL must be provided for "
+ myOperationName + " on this server");
}
return Collections.emptyList();
}
@Nullable
@Override
public List<String> validatePartitionedUrls(@Nonnull List<PartitionedUrl> thePartitionedUrls) {
List<String> urls =
thePartitionedUrls.stream().map(PartitionedUrl::getUrl).collect(Collectors.toList());
return validateUrls(urls);
}
}

View File

@ -26,12 +26,10 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.util.Logs;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.slf4j.Logger;
import org.thymeleaf.util.StringUtils;
@ -40,7 +38,8 @@ import java.util.List;
import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE;
public class GenerateRangeChunksStep<PT extends JobParameters> implements IFirstJobStepWorker<PT, ChunkRangeJson> {
public class GenerateRangeChunksStep<PT extends PartitionedUrlJobParameters>
implements IFirstJobStepWorker<PT, ChunkRangeJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@Nonnull
@ -54,69 +53,26 @@ public class GenerateRangeChunksStep<PT extends JobParameters> implements IFirst
Date start = BATCH_START_DATE;
Date end = new Date();
// there are partitions configured in either of the following lists, which are both optional
// the following code considers all use-cases
// the logic can be simplified once PartitionedUrl.myRequestPartitionId is deprecated
// @see IJobPartitionProvider
List<RequestPartitionId> partitionIds = params.getRequestPartitionIds();
List<PartitionedUrl> partitionedUrls = params.getPartitionedUrls();
if (partitionIds.isEmpty()) {
if (partitionedUrls.isEmpty()) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end);
sendChunk(chunkRangeJson, theDataSink);
return RunOutcome.SUCCESS;
}
if (!partitionedUrls.isEmpty()) {
partitionedUrls.forEach(partitionedUrl -> {
String url = partitionedUrl.getUrl();
RequestPartitionId partitionId = partitionedUrl.getRequestPartitionId();
ChunkRangeJson chunkRangeJson =
new ChunkRangeJson(start, end).setUrl(url).setPartitionId(partitionId);
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end)
.setUrl(partitionedUrl.getUrl())
.setPartitionId(partitionedUrl.getRequestPartitionId());
sendChunk(chunkRangeJson, theDataSink);
});
return RunOutcome.SUCCESS;
}
partitionIds.forEach(partitionId -> {
if (partitionedUrls.isEmpty()) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end).setPartitionId(partitionId);
sendChunk(chunkRangeJson, theDataSink);
return;
}
partitionedUrls.forEach(partitionedUrl -> {
String url = partitionedUrl.getUrl();
RequestPartitionId urlPartitionId = partitionedUrl.getRequestPartitionId();
RequestPartitionId narrowPartitionId = determineNarrowPartitionId(partitionId, urlPartitionId);
ChunkRangeJson chunkRangeJson =
new ChunkRangeJson(start, end).setUrl(url).setPartitionId(narrowPartitionId);
sendChunk(chunkRangeJson, theDataSink);
});
});
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(start, end);
sendChunk(chunkRangeJson, theDataSink);
return RunOutcome.SUCCESS;
}
private RequestPartitionId determineNarrowPartitionId(
@Nonnull RequestPartitionId theRequestPartitionId,
@Nullable RequestPartitionId theOtherRequestPartitionId) {
if (theOtherRequestPartitionId == null) {
return theRequestPartitionId;
}
if (theRequestPartitionId.isAllPartitions() && !theOtherRequestPartitionId.isAllPartitions()) {
return theOtherRequestPartitionId;
}
if (theRequestPartitionId.isDefaultPartition()
&& !theOtherRequestPartitionId.isDefaultPartition()
&& !theOtherRequestPartitionId.isAllPartitions()) {
return theOtherRequestPartitionId;
}
return theRequestPartitionId;
}
private void sendChunk(ChunkRangeJson theData, IJobDataSink<ChunkRangeJson> theDataSink) {
String url = theData.getUrl();
ourLog.info(
ourLog.trace(
"Creating chunks for [{}] from {} to {} for partition {}",
!StringUtils.isEmpty(url) ? url : "everything",
theData.getStart(),

View File

@ -26,11 +26,11 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc;
import jakarta.annotation.Nonnull;
public class LoadIdsStep<PT extends JobParameters>
public class LoadIdsStep<PT extends PartitionedUrlJobParameters>
implements IJobStepWorker<PT, ChunkRangeJson, ResourceIdListWorkChunkJson> {
private final ResourceIdListStep<PT> myResourceIdListStep;

View File

@ -27,7 +27,7 @@ import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.chunk.TypedPidJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
import ca.uhn.fhir.util.Logs;
@ -42,7 +42,7 @@ import java.util.stream.Stream;
import static ca.uhn.fhir.util.StreamUtil.partition;
import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
public class ResourceIdListStep<PT extends JobParameters>
public class ResourceIdListStep<PT extends PartitionedUrlJobParameters>
implements IJobStepWorker<PT, ChunkRangeJson, ResourceIdListWorkChunkJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
@ -66,7 +66,12 @@ public class ResourceIdListStep<PT extends JobParameters>
Date end = data.getEnd();
Integer batchSize = theStepExecutionDetails.getParameters().getBatchSize();
ourLog.info("Beginning to submit chunks in range {} to {}", start, end);
ourLog.trace(
"Beginning to submit chunks in range {} to {} for url {} and partitionId {}",
start,
end,
data.getUrl(),
data.getPartitionId());
int chunkSize = Math.min(defaultIfNull(batchSize, MAX_BATCH_OF_IDS), MAX_BATCH_OF_IDS);
final IResourcePidStream searchResult =
@ -84,7 +89,12 @@ public class ResourceIdListStep<PT extends JobParameters>
chunkCount.getAndIncrement();
submitWorkChunk(idBatch, searchResult.getRequestPartitionId(), theDataSink);
});
ourLog.info("Submitted {} chunks with {} resource IDs", chunkCount, totalIdsFound);
ourLog.trace(
"Submitted {} chunks with {} resource IDs for url {} and partitionId {}",
chunkCount,
totalIdsFound,
data.getUrl(),
data.getPartitionId());
});
return RunOutcome.SUCCESS;
@ -97,9 +107,9 @@ public class ResourceIdListStep<PT extends JobParameters>
if (theTypedPids.isEmpty()) {
return;
}
ourLog.info("Submitting work chunk in partition {} with {} IDs", theRequestPartitionId, theTypedPids.size());
ourLog.trace("Submitting work chunk in partition {} with {} IDs", theRequestPartitionId, theTypedPids.size());
ResourceIdListWorkChunkJson data = new ResourceIdListWorkChunkJson(theTypedPids, theRequestPartitionId);
ourLog.debug("IDs are: {}", data);
ourLog.trace("IDs are: {}", data);
theDataSink.accept(data);
}
}

View File

@ -1,43 +0,0 @@
package ca.uhn.fhir.batch2.coordinator;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.server.provider.ProviderConstants;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SimpleJobPartitionProviderTest {
@Mock
private IRequestPartitionHelperSvc myRequestPartitionHelperSvc;
@InjectMocks
private SimpleJobPartitionProvider myJobPartitionProvider;
@Test
public void getPartitions_requestSpecificPartition_returnsPartition() {
// setup
SystemRequestDetails requestDetails = new SystemRequestDetails();
String operation = ProviderConstants.OPERATION_EXPORT;
RequestPartitionId partitionId = RequestPartitionId.fromPartitionId(1);
when(myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation(ArgumentMatchers.eq(requestDetails), ArgumentMatchers.eq(operation))).thenReturn(partitionId);
// test
List<RequestPartitionId> partitionIds = myJobPartitionProvider.getPartitions(requestDetails, operation);
// verify
Assertions.assertThat(partitionIds).hasSize(1);
Assertions.assertThat(partitionIds).containsExactlyInAnyOrder(partitionId);
}
}

View File

@ -4,7 +4,7 @@ import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import org.junit.jupiter.api.AfterEach;
@ -17,12 +17,9 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.util.Batch2Utils.BATCH_START_DATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -31,13 +28,11 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class GenerateRangeChunksStepTest {
private final GenerateRangeChunksStep<JobParameters> myStep = new GenerateRangeChunksStep<>();
private final GenerateRangeChunksStep<PartitionedUrlJobParameters> myStep = new GenerateRangeChunksStep<>();
@Mock
private StepExecutionDetails<JobParameters, VoidModel> myStepExecutionDetails;
private StepExecutionDetails<PartitionedUrlJobParameters, VoidModel> myStepExecutionDetails;
@Mock
private IJobDataSink<ChunkRangeJson> myJobDataSink;
private static final Date START = BATCH_START_DATE;
private static final Date END = new Date();
@BeforeEach
void setUp() {
@ -48,93 +43,46 @@ public class GenerateRangeChunksStepTest {
}
public static Stream<Arguments> getReindexParameters() {
List<RequestPartitionId> threePartitions = List.of(
RequestPartitionId.fromPartitionId(1),
RequestPartitionId.fromPartitionId(2),
RequestPartitionId.fromPartitionId(3)
);
List<RequestPartitionId> partition1 = List.of(RequestPartitionId.fromPartitionId(1));
RequestPartitionId partition1 = RequestPartitionId.fromPartitionId(1);
RequestPartitionId partition2 = RequestPartitionId.fromPartitionId(2);
// the actual values (URLs, partitionId) don't matter, but we add values similar to real hapi-fhir use-cases
return Stream.of(
Arguments.of(List.of(), List.of(), false, 1),
Arguments.of(List.of(), partition1, false, 1),
Arguments.of(List.of("Observation?"), threePartitions, false, 3),
Arguments.of(List.of("Observation?"), List.of(), false, 1),
Arguments.of(List.of("Observation?"), partition1, true, 1),
Arguments.of(List.of("Observation?", "Patient?"), threePartitions, false, 6),
Arguments.of(List.of("Observation?", "Patient?", "Practitioner?"), threePartitions, true, 3),
Arguments.of(List.of("Observation?status=final", "Patient?"), partition1, false, 2),
Arguments.of(List.of("Observation?status=final"), threePartitions, false, 3)
Arguments.of(List.of()),
Arguments.of(List.of(new PartitionedUrl())),
Arguments.of(List.of(new PartitionedUrl().setUrl("url").setRequestPartitionId(partition1)),
Arguments.of(List.of(
new PartitionedUrl().setUrl("url1").setRequestPartitionId(partition1),
new PartitionedUrl().setUrl("url2").setRequestPartitionId(partition2)))
)
);
}
@ParameterizedTest
@MethodSource(value = "getReindexParameters")
public void run_withParameters_producesExpectedChunks(List<String> theUrls, List<RequestPartitionId> thePartitions,
boolean theShouldAssignPartitionToUrl, int theExpectedChunkCount) {
JobParameters parameters = JobParameters.from(theUrls, thePartitions, theShouldAssignPartitionToUrl);
public void run_withParameters_producesExpectedChunks(List<PartitionedUrl> thePartitionedUrls) {
PartitionedUrlJobParameters parameters = new PartitionedUrlJobParameters();
thePartitionedUrls.forEach(parameters::addPartitionedUrl);
when(myStepExecutionDetails.getParameters()).thenReturn(parameters);
myStep.run(myStepExecutionDetails, myJobDataSink);
ArgumentCaptor<ChunkRangeJson> captor = ArgumentCaptor.forClass(ChunkRangeJson.class);
verify(myJobDataSink, times(theExpectedChunkCount)).accept(captor.capture());
int expectedChunkCount = !thePartitionedUrls.isEmpty() ? thePartitionedUrls.size() : 1;
verify(myJobDataSink, times(expectedChunkCount)).accept(captor.capture());
List<ChunkRangeJson> chunkRangeJsonList = getExpectedChunkList(theUrls, thePartitions, theShouldAssignPartitionToUrl, theExpectedChunkCount);
RequestPartitionId[] actualPartitionIds = captor.getAllValues().stream().map(ChunkRangeJson::getPartitionId).toList().toArray(new RequestPartitionId[0]);
RequestPartitionId[] expectedPartitionIds = chunkRangeJsonList.stream().map(ChunkRangeJson::getPartitionId).toList().toArray(new RequestPartitionId[0]);
assertThat(actualPartitionIds).containsExactlyInAnyOrder(expectedPartitionIds);
String[] actualUrls = captor.getAllValues().stream().map(ChunkRangeJson::getUrl).toList().toArray(new String[0]);
String[] expectedUrls = chunkRangeJsonList.stream().map(ChunkRangeJson::getUrl).toList().toArray(new String[0]);
assertThat(actualUrls).containsExactlyInAnyOrder(expectedUrls);
}
private List<ChunkRangeJson> getExpectedChunkList(List<String> theUrls, List<RequestPartitionId> thePartitions,
boolean theShouldAssignPartitionToUrl, int theExpectedChunkCount) {
List<ChunkRangeJson> chunkRangeJsonList = new ArrayList<>();
if (theShouldAssignPartitionToUrl) {
for (int i = 0; i < theExpectedChunkCount; i++) {
String url = theUrls.get(i);
RequestPartitionId partition = thePartitions.get(i);
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url).setPartitionId(partition);
chunkRangeJsonList.add(chunkRangeJson);
if (thePartitionedUrls.isEmpty()) {
ChunkRangeJson chunkRangeJson = captor.getValue();
assertThat(chunkRangeJson.getUrl()).isNull();
assertThat(chunkRangeJson.getPartitionId()).isNull();
} else {
List<ChunkRangeJson> chunks = captor.getAllValues();
assertThat(chunks).hasSize(thePartitionedUrls.size());
for (int i = 0; i < thePartitionedUrls.size(); i++) {
PartitionedUrl partitionedUrl = thePartitionedUrls.get(i);
ChunkRangeJson chunkRangeJson = captor.getAllValues().get(i);
assertThat(chunkRangeJson.getUrl()).isEqualTo(partitionedUrl.getUrl());
assertThat(chunkRangeJson.getPartitionId()).isEqualTo(partitionedUrl.getRequestPartitionId());
}
return chunkRangeJsonList;
}
if (theUrls.isEmpty() && thePartitions.isEmpty()) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END);
chunkRangeJsonList.add(chunkRangeJson);
return chunkRangeJsonList;
}
if (theUrls.isEmpty()) {
for (RequestPartitionId partition : thePartitions) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setPartitionId(partition);
chunkRangeJsonList.add(chunkRangeJson);
}
return chunkRangeJsonList;
}
if (thePartitions.isEmpty()) {
for (String url : theUrls) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url);
chunkRangeJsonList.add(chunkRangeJson);
}
return chunkRangeJsonList;
}
theUrls.forEach(url -> {
for (RequestPartitionId partition : thePartitions) {
ChunkRangeJson chunkRangeJson = new ChunkRangeJson(START, END).setUrl(url).setPartitionId(partition);
chunkRangeJsonList.add(chunkRangeJson);
}
});
return chunkRangeJsonList;
}
}

View File

@ -4,7 +4,7 @@ import ca.uhn.fhir.batch2.api.IJobDataSink;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
@ -48,7 +48,7 @@ public class LoadIdsStepTest {
@Mock
private IJobDataSink<ResourceIdListWorkChunkJson> mySink;
private LoadIdsStep<JobParameters> mySvc;
private LoadIdsStep<PartitionedUrlJobParameters> mySvc;
@BeforeEach
public void before() {
@ -60,12 +60,12 @@ public class LoadIdsStepTest {
@Test
public void testGenerateSteps() {
JobParameters parameters = new JobParameters();
PartitionedUrlJobParameters parameters = new PartitionedUrlJobParameters();
ChunkRangeJson range = new ChunkRangeJson(DATE_1, DATE_END);
String instanceId = "instance-id";
JobInstance jobInstance = JobInstance.fromInstanceId(instanceId);
String chunkId = "chunk-id";
StepExecutionDetails<JobParameters, ChunkRangeJson> details = new StepExecutionDetails<>(parameters, range, jobInstance, new WorkChunk().setId(chunkId));
StepExecutionDetails<PartitionedUrlJobParameters, ChunkRangeJson> details = new StepExecutionDetails<>(parameters, range, jobInstance, new WorkChunk().setId(chunkId));
// First Execution

View File

@ -5,7 +5,7 @@ import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson;
import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import ca.uhn.fhir.interceptor.model.RequestPartitionId;
import ca.uhn.fhir.jpa.api.pid.HomogeneousResourcePidList;
import ca.uhn.fhir.jpa.api.pid.IResourcePidStream;
@ -38,18 +38,18 @@ class ResourceIdListStepTest {
@Mock
private IIdChunkProducer<ChunkRangeJson> myIdChunkProducer;
@Mock
private StepExecutionDetails<JobParameters, ChunkRangeJson> myStepExecutionDetails;
private StepExecutionDetails<PartitionedUrlJobParameters, ChunkRangeJson> myStepExecutionDetails;
@Mock
private IJobDataSink<ResourceIdListWorkChunkJson> myDataSink;
@Mock
private ChunkRangeJson myData;
@Mock
private JobParameters myParameters;
private PartitionedUrlJobParameters myParameters;
@Captor
private ArgumentCaptor<ResourceIdListWorkChunkJson> myDataCaptor;
private ResourceIdListStep<JobParameters> myResourceIdListStep;
private ResourceIdListStep<PartitionedUrlJobParameters> myResourceIdListStep;
@BeforeEach
void beforeEach() {

View File

@ -19,7 +19,7 @@
*/
package ca.uhn.fhir.mdm.batch2.clear;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nonnull;
import jakarta.validation.constraints.Pattern;
@ -28,7 +28,7 @@ import org.apache.commons.lang3.Validate;
import java.util.ArrayList;
import java.util.List;
public class MdmClearJobParameters extends JobParameters {
public class MdmClearJobParameters extends PartitionedUrlJobParameters {
@JsonProperty("resourceType")
@Nonnull
private List<@Pattern(regexp = "^[A-Z][A-Za-z]+$", message = "If populated, must be a valid resource type'") String>

View File

@ -19,6 +19,6 @@
*/
package ca.uhn.fhir.mdm.batch2.submit;
import ca.uhn.fhir.batch2.jobs.parameters.JobParameters;
import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters;
public class MdmSubmitJobParameters extends JobParameters {}
public class MdmSubmitJobParameters extends PartitionedUrlJobParameters {}