diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/HapiJpaConfig.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/HapiJpaConfig.java index 8d6d6274be0..69ee799cb5f 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/HapiJpaConfig.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/config/HapiJpaConfig.java @@ -28,7 +28,7 @@ import ca.uhn.fhir.jpa.dao.IFulltextSearchSvc; import ca.uhn.fhir.jpa.dao.search.HSearchSortHelperImpl; import ca.uhn.fhir.jpa.dao.search.IHSearchSortHelper; import ca.uhn.fhir.jpa.provider.DaoRegistryResourceSupportedSvc; -import ca.uhn.fhir.jpa.provider.RehomingProvider; +import ca.uhn.fhir.jpa.provider.RehomeProvider; import ca.uhn.fhir.jpa.search.DatabaseBackedPagingProvider; import ca.uhn.fhir.jpa.search.IStaleSearchDeletingSvc; import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl; @@ -87,7 +87,7 @@ public class HapiJpaConfig { } @Bean - public RehomingProvider rehomingProvider() { - return new RehomingProvider(); + public RehomeProvider rehomeProvider() { + return new RehomeProvider(); } } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/RehomingProvider.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/RehomeProvider.java similarity index 93% rename from hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/RehomingProvider.java rename to hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/RehomeProvider.java index c8a1697ad2e..5776d7d09ca 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/RehomingProvider.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/provider/RehomeProvider.java @@ -28,7 +28,6 @@ import ca.uhn.fhir.model.api.Include; import ca.uhn.fhir.model.api.annotation.Description; import ca.uhn.fhir.rest.annotation.IdParam; import ca.uhn.fhir.rest.annotation.Operation; -import ca.uhn.fhir.rest.annotation.OperationParam; import ca.uhn.fhir.rest.api.MethodOutcome; import ca.uhn.fhir.rest.api.PatchTypeEnum; import ca.uhn.fhir.rest.api.server.RequestDetails; @@ -40,7 +39,6 @@ import jakarta.annotation.Nonnull; import org.hl7.fhir.instance.model.api.IBaseParameters; import org.hl7.fhir.instance.model.api.IBaseResource; import org.hl7.fhir.instance.model.api.IIdType; -import org.hl7.fhir.instance.model.api.IPrimitiveType; import org.hl7.fhir.r4.model.CodeType; import org.hl7.fhir.r4.model.Parameters; import org.hl7.fhir.r4.model.Reference; @@ -65,10 +63,9 @@ import static ca.uhn.fhir.jpa.patch.FhirPatch.PARAMETER_VALUE; import static ca.uhn.fhir.rest.api.Constants.PARAM_ID; import static software.amazon.awssdk.utils.StringUtils.isBlank; -public class RehomingProvider { - private static final Logger ourLog = LoggerFactory.getLogger(RehomingProvider.class); +public class RehomeProvider { - public static final String ONLINE_REFERENCE_COUNT_LIMIT = "online-reference-count-limit"; + public static final int BATCH_REFERENCE_COUNT_THRESHOLD = 100; @Autowired private FhirContext myFhirContext; @@ -76,16 +73,16 @@ public class RehomingProvider { @Autowired private DaoRegistry myDaoRegistry; + int myBatchReferenceCountThreshold = BATCH_REFERENCE_COUNT_THRESHOLD; + @Description( value = "This operation repoints referenced resources to a new target resource instance of the same previously pointed type.", shortDefinition = "Repoints referencing resources to another resources instance") - @Operation(name = ProviderConstants.OPERATION_REHOMING, global = true) - public IBaseParameters rehoming( + @Operation(name = ProviderConstants.OPERATION_REHOME, global = true) + public IBaseParameters rehome( @IdParam IIdType theCurrentTargetIdParam, @IdParam IIdType theNewTargetIdParam, - @OperationParam(name = ONLINE_REFERENCE_COUNT_LIMIT, typeName = "integer") - IPrimitiveType theRefCountLimit, RequestDetails theRequest) { validate(theCurrentTargetIdParam, theNewTargetIdParam, theRequest); @@ -94,7 +91,7 @@ public class RehomingProvider { findReferencingResourceIds(theCurrentTargetIdParam, theRequest); // operation is performed online or batched depending on the number of references to patch - return referencingResources.size() <= theRefCountLimit.getValue() + return referencingResources.size() > myBatchReferenceCountThreshold ? rehomeInTransaction(referencingResources, theCurrentTargetIdParam, theNewTargetIdParam, theRequest) : rehomingBatch(referencingResources, theCurrentTargetIdParam, theNewTargetIdParam, theRequest); } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/SystemProviderR4Test.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/SystemProviderR4Test.java index 3e97e108f38..eef96d26ab6 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/SystemProviderR4Test.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/provider/r4/SystemProviderR4Test.java @@ -9,7 +9,7 @@ import ca.uhn.fhir.interceptor.api.IAnonymousInterceptor; import ca.uhn.fhir.interceptor.api.IPointcut; import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; -import ca.uhn.fhir.jpa.provider.RehomingProvider; +import ca.uhn.fhir.jpa.provider.RehomeProvider; import ca.uhn.fhir.jpa.rp.r4.AppointmentResourceProvider; import ca.uhn.fhir.jpa.rp.r4.BinaryResourceProvider; import ca.uhn.fhir.jpa.rp.r4.DiagnosticReportResourceProvider; @@ -130,7 +130,7 @@ public class SystemProviderR4Test extends BaseJpaR4Test { private SimpleRequestHeaderInterceptor mySimpleHeaderInterceptor; @Autowired - RehomingProvider myRehomingProvider; + RehomeProvider myRehomeProvider; @Autowired private DeleteExpungeProvider myDeleteExpungeProvider; @@ -1009,7 +1009,7 @@ public class SystemProviderR4Test extends BaseJpaR4Test { } @Nested - class RehomingOperation { + class RehomeOperation { @BeforeEach void setUp() { @@ -1079,7 +1079,7 @@ public class SystemProviderR4Test extends BaseJpaR4Test { // execute RequestDetails requestDetails = new SystemRequestDetails(); requestDetails.setResourceName("Patient"); - Parameters result = (Parameters) myRehomingProvider.rehoming(currentTargetPatientId, newTargetPatientId, new IntegerType(100), requestDetails); + Parameters result = (Parameters) myRehomeProvider.rehome(currentTargetPatientId, newTargetPatientId, requestDetails); // verify validateResultDiagnostics(result, sourceResourceIds); @@ -1104,7 +1104,7 @@ public class SystemProviderR4Test extends BaseJpaR4Test { // execute RequestDetails requestDetails = new SystemRequestDetails(); requestDetails.setResourceName("Observation"); - Parameters result = (Parameters) myRehomingProvider.rehoming(currentTargetObsId, newTargetObsId, new IntegerType(100), requestDetails); + Parameters result = (Parameters) myRehomeProvider.rehome(currentTargetObsId, newTargetObsId, requestDetails); // verify validateResultDiagnostics(result, List.of(sourceObsId)); diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/IRehomeJobSubmitter.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/IRehomeJobSubmitter.java new file mode 100644 index 00000000000..7b3a95012a0 --- /dev/null +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/api/server/storage/IRehomeJobSubmitter.java @@ -0,0 +1,34 @@ +/*- + * #%L + * HAPI FHIR - Server Framework + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.rest.api.server.storage; + +import ca.uhn.fhir.rest.api.server.RequestDetails; + +import java.util.List; + +public interface IRehomeJobSubmitter { + /** + * @param theBatchSize For each pass, when synchronously searching for resources, + * limit the number of matching resources to this number + * @param theUrlsToProcess A list of strings of the form "/Patient/123" or equivalent + * @return The Batch2 JobId that was started to run this batch job + */ + String submitJob(Integer theBatchSize, List theUrlsToProcess, RequestDetails theRequest); +} diff --git a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/ProviderConstants.java b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/ProviderConstants.java index 0b3ff6926a6..2a178deb37f 100644 --- a/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/ProviderConstants.java +++ b/hapi-fhir-server/src/main/java/ca/uhn/fhir/rest/server/provider/ProviderConstants.java @@ -248,5 +248,6 @@ public class ProviderConstants { /** * Operation name for the "$rehoming" operation */ - public static final String OPERATION_REHOMING = "$rehoming"; + public static final String OPERATION_REHOME = "$rehome" + + ""; } diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeAppCtx.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeAppCtx.java new file mode 100644 index 00000000000..54a57960e71 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeAppCtx.java @@ -0,0 +1,99 @@ +/*- + * #%L + * hapi-fhir-storage-batch2-jobs + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.rehoming; + +import ca.uhn.fhir.batch2.jobs.chunk.ChunkRangeJson; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.batch2.jobs.parameters.UrlListValidator; +import ca.uhn.fhir.batch2.jobs.step.GenerateRangeChunksStep; +import ca.uhn.fhir.batch2.jobs.step.LoadIdsStep; +import ca.uhn.fhir.batch2.model.JobDefinition; +import ca.uhn.fhir.jpa.api.svc.IBatch2DaoSvc; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.jpa.model.dao.JpaPid; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; +import ca.uhn.fhir.rest.server.provider.ProviderConstants; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RehomeAppCtx { + + public static final String JOB_REHOME = "REHOME"; + + @Bean + public JobDefinition rehomeJobDefinition( + IBatch2DaoSvc theBatch2DaoSvc, + HapiTransactionService theHapiTransactionService, + IIdHelperService theIdHelperService, + IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { + + return JobDefinition.newBuilder() + .setJobDefinitionId(JOB_REHOME) + .setJobDescription("Rehome references") + .setJobDefinitionVersion(1) + .setParametersType(RehomeJobParameters.class) + .setParametersValidator(rehomeParametersValidator( + theBatch2DaoSvc, theRequestPartitionHelperSvc)) + .addFirstStep( + "generate-ranges", + "Generate data ranges to rehome", + ChunkRangeJson.class, + rehomeGenerateRangeChunksStep()) + .addIntermediateStep( + "load-ids", + "Load IDs of resources to rehome", + ResourceIdListWorkChunkJson.class, + rehomeLoadIdsStep(theBatch2DaoSvc)) + .addLastStep( + "rehome", + "Perform the reference rehoming", + rehomeStep(theHapiTransactionService, theIdHelperService)) + .build(); + } + + @Bean + public RehomeJobParametersValidator rehomeParametersValidator( + IBatch2DaoSvc theBatch2DaoSvc, + IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { + return new RehomeJobParametersValidator( + new UrlListValidator(ProviderConstants.OPERATION_REHOME, theBatch2DaoSvc), + theRequestPartitionHelperSvc); + } + + @Bean + public LoadIdsStep rehomeLoadIdsStep(IBatch2DaoSvc theBatch2DaoSvc) { + return new LoadIdsStep<>(theBatch2DaoSvc); + } + + @Bean + public RehomeStep rehomeStep( + HapiTransactionService theHapiTransactionService, + IIdHelperService theIdHelperService) { + return new RehomeStep(theHapiTransactionService, theIdHelperService); + } + + @Bean + public GenerateRangeChunksStep rehomeGenerateRangeChunksStep() { + return new GenerateRangeChunksStep<>(); + } + +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeJobParameters.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeJobParameters.java new file mode 100644 index 00000000000..e7b750649d1 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeJobParameters.java @@ -0,0 +1,54 @@ +/*- + * #%L + * hapi-fhir-storage-batch2-jobs + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.rehoming; + +import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrlJobParameters; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RehomeJobParameters extends PartitionedUrlJobParameters { + @JsonProperty("cascade") + private boolean myCascade; + + @JsonProperty("cascadeMaxRounds") + private Integer myCascadeMaxRounds; + + /** + * Constructor + */ + public RehomeJobParameters() { + super(); + } + + public Integer getCascadeMaxRounds() { + return myCascadeMaxRounds; + } + + public void setCascadeMaxRounds(Integer theCascadeMaxRounds) { + myCascadeMaxRounds = theCascadeMaxRounds; + } + + public boolean isCascade() { + return myCascade; + } + + public void setCascade(boolean theCascade) { + myCascade = theCascade; + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeJobParametersValidator.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeJobParametersValidator.java new file mode 100644 index 00000000000..57eae1d5235 --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeJobParametersValidator.java @@ -0,0 +1,62 @@ +/*- + * #%L + * hapi-fhir-storage-batch2-jobs + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.rehoming; + +import ca.uhn.fhir.batch2.api.IJobParametersValidator; +import ca.uhn.fhir.batch2.jobs.parameters.IUrlListValidator; +import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.util.ValidateUtil; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; + +import java.util.List; + +public class RehomeJobParametersValidator implements IJobParametersValidator { + private final IUrlListValidator myUrlListValidator; + private final IRequestPartitionHelperSvc myRequestPartitionHelperSvc; + + public RehomeJobParametersValidator( + IUrlListValidator theUrlListValidator, + IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { + myUrlListValidator = theUrlListValidator; + myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; + } + + @Nullable + @Override + public List validate(RequestDetails theRequestDetails, @Nonnull RehomeJobParameters theParameters) { + + // Verify that the user has access to all requested partitions + for (PartitionedUrl partitionedUrl : theParameters.getPartitionedUrls()) { + String url = partitionedUrl.getUrl(); + ValidateUtil.isTrueOrThrowInvalidRequest( + url.matches("[a-zA-Z]+\\?.*"), + "Rehoming URLs must be in the format [resourceType]?[parameters]"); + + if (partitionedUrl.getRequestPartitionId() != null) { + myRequestPartitionHelperSvc.validateHasPartitionPermissions( + theRequestDetails, null, partitionedUrl.getRequestPartitionId()); + } + } + return myUrlListValidator.validateUrls(theParameters.getUrls()); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeJobSubmitterImpl.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeJobSubmitterImpl.java new file mode 100644 index 00000000000..9ce46222a6a --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeJobSubmitterImpl.java @@ -0,0 +1,78 @@ +/*- + * #%L + * hapi-fhir-storage-batch2-jobs + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.rehoming; + +import ca.uhn.fhir.batch2.api.IJobCoordinator; +import ca.uhn.fhir.batch2.jobs.parameters.PartitionedUrl; +import ca.uhn.fhir.batch2.jobs.parameters.UrlPartitioner; +import ca.uhn.fhir.batch2.model.JobInstanceStartRequest; +import ca.uhn.fhir.interceptor.model.RequestPartitionId; +import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse; +import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.storage.IRehomeJobSubmitter; +import ca.uhn.fhir.rest.server.provider.ProviderConstants; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +public class RehomeJobSubmitterImpl implements IRehomeJobSubmitter { + @Autowired + IJobCoordinator myJobCoordinator; + + @Autowired + IRequestPartitionHelperSvc myRequestPartitionHelperSvc; + + @Autowired + UrlPartitioner myUrlPartitioner; + + @Override + @Transactional(propagation = Propagation.NEVER) + public String submitJob( + Integer theBatchSize, + List theUrlsToRehome, + RequestDetails theRequestDetails) { + + RehomeJobParameters rehomeJobParameters = new RehomeJobParameters(); + // Set partition for each url since resource type can determine partition + theUrlsToRehome.stream() + .filter(StringUtils::isNotBlank) + .map(url -> myUrlPartitioner.partitionUrl(url, theRequestDetails)) + .forEach(rehomeJobParameters::addPartitionedUrl); + rehomeJobParameters.setBatchSize(theBatchSize); + + if (theUrlsToRehome.isEmpty()) { // fix for https://github.com/hapifhir/hapi-fhir/issues/6179 + RequestPartitionId requestPartition = + myRequestPartitionHelperSvc.determineReadPartitionForRequestForServerOperation( + theRequestDetails, ProviderConstants.OPERATION_REHOME); + rehomeJobParameters.addPartitionedUrl(new PartitionedUrl().setRequestPartitionId(requestPartition)); + } + rehomeJobParameters.setCascade(false); + + JobInstanceStartRequest startRequest = new JobInstanceStartRequest(); + startRequest.setJobDefinitionId(RehomeAppCtx.JOB_REHOME); + startRequest.setParameters(rehomeJobParameters); + Batch2JobStartResponse startResponse = myJobCoordinator.startInstance(theRequestDetails, startRequest); + return startResponse.getInstanceId(); + } +} diff --git a/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeStep.java b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeStep.java new file mode 100644 index 00000000000..4e7a97a61cc --- /dev/null +++ b/hapi-fhir-storage-batch2-jobs/src/main/java/ca/uhn/fhir/batch2/jobs/rehoming/RehomeStep.java @@ -0,0 +1,163 @@ +/*- + * #%L + * hapi-fhir-storage-batch2-jobs + * %% + * Copyright (C) 2014 - 2024 Smile CDR, Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package ca.uhn.fhir.batch2.jobs.rehoming; + +import ca.uhn.fhir.batch2.api.IJobDataSink; +import ca.uhn.fhir.batch2.api.IJobStepWorker; +import ca.uhn.fhir.batch2.api.JobExecutionFailedException; +import ca.uhn.fhir.batch2.api.RunOutcome; +import ca.uhn.fhir.batch2.api.StepExecutionDetails; +import ca.uhn.fhir.batch2.api.VoidModel; +import ca.uhn.fhir.batch2.jobs.chunk.ResourceIdListWorkChunkJson; +import ca.uhn.fhir.jpa.api.svc.IIdHelperService; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.jpa.model.dao.JpaPid; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; + +import java.util.List; + +public class RehomeStep + implements IJobStepWorker { + + private static final Logger ourLog = LoggerFactory.getLogger(RehomeStep.class); + private final HapiTransactionService myHapiTransactionService; + private final IIdHelperService myIdHelperService; + + public RehomeStep( + HapiTransactionService theHapiTransactionService, + IIdHelperService theIdHelperService) { + myHapiTransactionService = theHapiTransactionService; + myIdHelperService = theIdHelperService; + } + + @Nonnull + @Override + public RunOutcome run( + @Nonnull StepExecutionDetails theStepExecutionDetails, + @Nonnull IJobDataSink theDataSink) + throws JobExecutionFailedException { + + ResourceIdListWorkChunkJson data = theStepExecutionDetails.getData(); + + boolean cascade = theStepExecutionDetails.getParameters().isCascade(); + Integer cascadeMaxRounds = theStepExecutionDetails.getParameters().getCascadeMaxRounds(); + return doRehome( + data, + theDataSink, + theStepExecutionDetails.getInstance().getInstanceId(), + theStepExecutionDetails.getChunkId(), + cascade, + cascadeMaxRounds); + } + + @Nonnull + public RunOutcome doRehome( + ResourceIdListWorkChunkJson theData, + IJobDataSink theDataSink, + String theInstanceId, + String theChunkId, + boolean theCascade, + Integer theCascadeMaxRounds) { + RequestDetails requestDetails = new SystemRequestDetails(); + TransactionDetails transactionDetails = new TransactionDetails(); + RehomeJob job = new RehomeJob( + theData, + requestDetails, + transactionDetails, + theDataSink, + theInstanceId, + theChunkId, + theCascade, + theCascadeMaxRounds); + myHapiTransactionService + .withRequest(requestDetails) + .withTransactionDetails(transactionDetails) + .withRequestPartitionId(theData.getRequestPartitionId()) + .execute(job); + + return new RunOutcome(job.getRecordCount()); + } + + private class RehomeJob implements TransactionCallback { + private final ResourceIdListWorkChunkJson myData; + private final RequestDetails myRequestDetails; + private final TransactionDetails myTransactionDetails; + private final IJobDataSink myDataSink; + private final String myChunkId; + private final String myInstanceId; + private final boolean myCascade; + private final Integer myCascadeMaxRounds; + private int myRecordCount; + + public RehomeJob( + ResourceIdListWorkChunkJson theData, + RequestDetails theRequestDetails, + TransactionDetails theTransactionDetails, + IJobDataSink theDataSink, + String theInstanceId, + String theChunkId, + boolean theCascade, + Integer theCascadeMaxRounds) { + myData = theData; + myRequestDetails = theRequestDetails; + myTransactionDetails = theTransactionDetails; + myDataSink = theDataSink; + myInstanceId = theInstanceId; + myChunkId = theChunkId; + myCascade = theCascade; + myCascadeMaxRounds = theCascadeMaxRounds; + } + + public int getRecordCount() { + return myRecordCount; + } + + @Override + public Void doInTransaction(@Nonnull TransactionStatus theStatus) { + + List persistentIds = myData.getResourcePersistentIds(myIdHelperService); + + if (persistentIds.isEmpty()) { + ourLog.info( + "Starting rehoming work chunk. There are no resources to rehome - Instance[{}] Chunk[{}]", + myInstanceId, + myChunkId); + return null; + } + + ourLog.info( + "Starting rehoming chunk with {} resources - Instance[{}] Chunk[{}]", + persistentIds.size(), + myInstanceId, + myChunkId); + + myRecordCount = myRehomeSvc.rehome(persistentIds, myCascade, myCascadeMaxRounds); + + return null; + } + } +}