Target the default partition for batch2 storage. (#5475)

This commit is contained in:
Michael Buckley 2023-11-21 19:12:38 -05:00 committed by GitHub
parent f519f17fc5
commit 9aa173a981
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 105 additions and 81 deletions

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 5475
title: "Ensure batch2 jpa persistence always targets the default partition."

View File

@ -115,7 +115,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
ourLog.trace(
"Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
myWorkChunkRepository.save(entity);
myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
return entity.getId();
}
@ -209,7 +209,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
@Nonnull
public Optional<JobInstance> fetchInstance(String theInstanceId) {
return myTransactionService
.withSystemRequest()
.withSystemRequestOnDefaultPartition()
.execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
}
@ -225,7 +225,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
List<Batch2JobInstanceEntity> instanceEntities;
if (statuses != null && !statuses.isEmpty()) {
if (definitionId.equals(Batch2JobDefinitionConstants.BULK_EXPORT)) {
if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) {
if (originalRequestUrlTruncation(params) != null) {
params = originalRequestUrlTruncation(params);
}
@ -268,18 +268,22 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
// default sort is myCreateTime Asc
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
return myJobInstanceRepository.findAll(pageRequest).stream()
.map(this::toInstance)
.collect(Collectors.toList());
return myTransactionService
.withSystemRequestOnDefaultPartition()
.execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
.map(this::toInstance)
.collect(Collectors.toList()));
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
return myJobInstanceRepository.findAll(pageRequest).stream()
.map(this::toInstance)
.collect(Collectors.toList());
return myTransactionService
.withSystemRequestOnDefaultPartition()
.execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
.map(this::toInstance)
.collect(Collectors.toList()));
}
private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) {
@ -295,44 +299,49 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
String chunkId = theParameters.getChunkId();
String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId);
Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed "
+ ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) "
+ "where myId = :chunkId and myErrorCount > :maxCount");
query.setParameter("chunkId", chunkId);
query.setParameter("failed", WorkChunkStatusEnum.FAILED);
query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT);
int failChangeCount = query.executeUpdate();
return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId);
if (failChangeCount > 0) {
return WorkChunkStatusEnum.FAILED;
} else {
return WorkChunkStatusEnum.ERRORED;
}
Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed "
+ ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) "
+ "where myId = :chunkId and myErrorCount > :maxCount");
query.setParameter("chunkId", chunkId);
query.setParameter("failed", WorkChunkStatusEnum.FAILED);
query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT);
int failChangeCount = query.executeUpdate();
if (failChangeCount > 0) {
return WorkChunkStatusEnum.FAILED;
} else {
return WorkChunkStatusEnum.ERRORED;
}
});
}
@Override
@Transactional
public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
String errorMessage = truncateErrorMessage(theErrorMessage);
myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED);
myTransactionService
.withSystemRequestOnDefaultPartition()
.execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED));
}
@Override
@Transactional
public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
theEvent.getChunkId(),
new Date(),
theEvent.getRecordsProcessed(),
theEvent.getRecoveredErrorCount(),
WorkChunkStatusEnum.COMPLETED,
theEvent.getRecoveredWarningMessage());
myTransactionService
.withSystemRequestOnDefaultPartition()
.execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
theEvent.getChunkId(),
new Date(),
theEvent.getRecordsProcessed(),
theEvent.getRecoveredErrorCount(),
WorkChunkStatusEnum.COMPLETED,
theEvent.getRecoveredWarningMessage()));
}
@Nullable
@ -389,7 +398,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
int thePageIndex,
Consumer<WorkChunk> theConsumer) {
myTransactionService
.withSystemRequest()
.withSystemRequestOnDefaultPartition()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> {
List<Batch2WorkChunkEntity> chunks;

View File

@ -27,10 +27,10 @@ import ca.uhn.fhir.batch2.model.JobInstanceStartRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.batch.models.Batch2JobStartResponse;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.jpa.util.TestUtil;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.params.provider.Arguments;
@ -52,6 +52,7 @@ public class PatientReindexTestHelper {
private final Batch2JobHelper myBatch2JobHelper;
private final IFhirResourceDao<Patient> myPatientDao;
private final boolean myIncrementVersionOnReindex;
private final RequestDetails myRequestDetails = new SystemRequestDetails();
public static Stream<Arguments> numResourcesParams(){
return Stream.of(
@ -79,7 +80,7 @@ public class PatientReindexTestHelper {
// Reindex 1
JobInstanceStartRequest reindexRequest1 = createPatientReindexRequest(theNumResources);
Batch2JobStartResponse reindexResponse1 = myJobCoordinator.startInstance(reindexRequest1);
Batch2JobStartResponse reindexResponse1 = myJobCoordinator.startInstance(myRequestDetails, reindexRequest1);
JobInstance instance1 = myBatch2JobHelper.awaitJobHasStatus(reindexResponse1.getInstanceId(), JOB_WAIT_TIME, StatusEnum.COMPLETED);
validateReindexJob(instance1, theNumResources);
@ -95,7 +96,7 @@ public class PatientReindexTestHelper {
// Reindex 1
JobInstanceStartRequest reindexRequest1 = createPatientReindexRequest(theNumResources);
Batch2JobStartResponse reindexResponse1 = myJobCoordinator.startInstance(reindexRequest1);
Batch2JobStartResponse reindexResponse1 = myJobCoordinator.startInstance(myRequestDetails, reindexRequest1);
JobInstance instance1 = myBatch2JobHelper.awaitJobHasStatus(reindexResponse1.getInstanceId(), JOB_WAIT_TIME, StatusEnum.COMPLETED);
validateReindexJob(instance1, theNumResources);
@ -104,7 +105,7 @@ public class PatientReindexTestHelper {
// Reindex 2
JobInstanceStartRequest reindexRequest2 = createPatientReindexRequest(theNumResources);
Batch2JobStartResponse reindexResponse2 = myJobCoordinator.startInstance(reindexRequest2);
Batch2JobStartResponse reindexResponse2 = myJobCoordinator.startInstance(myRequestDetails, reindexRequest2);
JobInstance instance2 = myBatch2JobHelper.awaitJobHasStatus(reindexResponse2.getInstanceId(), JOB_WAIT_TIME, StatusEnum.COMPLETED);
validateReindexJob(instance2, theNumResources);
@ -119,11 +120,11 @@ public class PatientReindexTestHelper {
// Reindex 1
JobInstanceStartRequest reindexRequest1 = createPatientReindexRequest(theNumResources);
Batch2JobStartResponse reindexResponse1 = myJobCoordinator.startInstance(reindexRequest1);
Batch2JobStartResponse reindexResponse1 = myJobCoordinator.startInstance(myRequestDetails, reindexRequest1);
// Reindex 2
JobInstanceStartRequest reindexRequest2 = createPatientReindexRequest(theNumResources);
Batch2JobStartResponse reindexResponse2 = myJobCoordinator.startInstance(reindexRequest2);
Batch2JobStartResponse reindexResponse2 = myJobCoordinator.startInstance(myRequestDetails, reindexRequest2);
// Wait for jobs to finish
JobInstance instance1 = myBatch2JobHelper.awaitJobHasStatus(reindexResponse1.getInstanceId(), JOB_WAIT_TIME, StatusEnum.COMPLETED);
@ -170,7 +171,7 @@ public class PatientReindexTestHelper {
startRequest.setJobDefinitionId(ReindexAppCtx.JOB_REINDEX);
ReindexJobParameters reindexJobParameters = new ReindexJobParameters();
reindexJobParameters.setBatchSize(theBatchSize);
reindexJobParameters.setBatchSize(Math.max(theBatchSize,1));
reindexJobParameters.addUrl("Patient?");
startRequest.setParameters(reindexJobParameters);

View File

@ -60,7 +60,6 @@ public interface IWorkChunkPersistence {
* @param theBatchWorkChunk the batch work chunk to be stored
* @return a globally unique identifier for this chunk.
*/
@Transactional(propagation = Propagation.REQUIRED)
String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk);
/**
@ -71,7 +70,7 @@ public interface IWorkChunkPersistence {
* @param theChunkId The ID from {@link #onWorkChunkCreate}
* @return The WorkChunk or empty if no chunk exists, or not in a runnable state (QUEUED or ERRORRED)
*/
@Transactional(propagation = Propagation.REQUIRED)
@Transactional(propagation = Propagation.MANDATORY)
Optional<WorkChunk> onWorkChunkDequeue(String theChunkId);
/**

View File

@ -43,7 +43,7 @@ import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.messaging.MessageHandler;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.Arrays;
@ -143,7 +143,7 @@ public class JobCoordinatorImpl implements IJobCoordinator {
myJobParameterJsonValidator.validateJobParameters(theRequestDetails, theStartRequest, jobDefinition);
IJobPersistence.CreateResult instanceAndFirstChunk = myTransactionService
.withSystemRequest()
.withSystemRequestOnDefaultPartition()
.execute(() -> myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters()));
JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(
@ -163,7 +163,7 @@ public class JobCoordinatorImpl implements IJobCoordinator {
*/
private void sendBatchJobWorkNotificationAfterCommit(final JobWorkNotification theJobWorkNotification) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public int getOrder() {
return 0;

View File

@ -231,46 +231,50 @@ class WorkChannelMessageHandler implements MessageHandler {
// We use Optional chaining here to simplify all the cases where we short-circuit exit.
// A step that returns an empty Optional means discard the chunk.
//
executeInTxRollbackWhenEmpty(() ->
(
// Use a chain of Optional flatMap to handle all the setup short-circuit exits cleanly.
Optional.of(new MessageProcess(workNotification))
// validate and load info
.flatMap(MessageProcess::validateChunkId)
// no job definition should be retried - we must be a stale process encountering a new
// job definition.
.flatMap(MessageProcess::loadJobDefinitionOrThrow)
.flatMap(MessageProcess::loadJobInstance)
// update statuses now in the db: QUEUED->IN_PROGRESS
.flatMap(MessageProcess::updateChunkStatusAndValidate)
.flatMap(MessageProcess::updateAndValidateJobStatus)
// ready to execute
.flatMap(MessageProcess::buildCursor)
.flatMap(MessageProcess::buildStepExecutor)))
.ifPresentOrElse(
// all the setup is happy and committed. Do the work.
process -> process.myStepExector.executeStep(),
// discard the chunk
() -> ourLog.debug("Discarding chunk notification {}", workNotification));
Optional<MessageProcess> processingPreparation = executeInTxRollbackWhenEmpty(() ->
// Use a chain of Optional flatMap to handle all the setup short-circuit exits cleanly.
Optional.of(new MessageProcess(workNotification))
// validate and load info
.flatMap(MessageProcess::validateChunkId)
// no job definition should be retried - we must be a stale process encountering a new
// job definition.
.flatMap(MessageProcess::loadJobDefinitionOrThrow)
.flatMap(MessageProcess::loadJobInstance)
// update statuses now in the db: QUEUED->IN_PROGRESS
.flatMap(MessageProcess::updateChunkStatusAndValidate)
.flatMap(MessageProcess::updateAndValidateJobStatus)
// ready to execute
.flatMap(MessageProcess::buildCursor)
.flatMap(MessageProcess::buildStepExecutor));
processingPreparation.ifPresentOrElse(
// all the setup is happy and committed. Do the work.
process -> process.myStepExector.executeStep(),
// discard the chunk
() -> ourLog.debug("Discarding chunk notification {}", workNotification));
}
/**
* Run theCallback in TX, rolling back if the supplied Optional is empty.
*/
<T> Optional<T> executeInTxRollbackWhenEmpty(Supplier<Optional<T>> theCallback) {
return myHapiTransactionService.withSystemRequest().execute(theTransactionStatus -> {
return myHapiTransactionService
// batch storage is not partitioned.
.withSystemRequestOnDefaultPartition()
.execute(theTransactionStatus -> {
// run the processing
Optional<T> setupProcessing = theCallback.get();
// run the processing
Optional<T> setupProcessing = theCallback.get();
if (setupProcessing.isEmpty()) {
// If any setup failed, roll back the chunk and instance status changes.
ourLog.debug("WorkChunk setup failed - rollback tx");
theTransactionStatus.setRollbackOnly();
}
// else COMMIT the work.
if (setupProcessing.isEmpty()) {
// If any setup failed, roll back the chunk and instance status changes.
ourLog.debug("WorkChunk setup failed - rollback tx");
theTransactionStatus.setRollbackOnly();
}
// else COMMIT the work.
return setupProcessing;
});
return setupProcessing;
});
}
}

View File

@ -70,10 +70,17 @@ public interface IHapiTransactionService {
return withSystemRequest().withRequestPartitionId(theRequestPartitionId);
}
/**
* Convenience for TX working with non-partitioned entities.
*/
default IExecutionBuilder withSystemRequestOnDefaultPartition() {
return withSystemRequestOnPartition(RequestPartitionId.defaultPartition());
}
/**
* @deprecated It is highly recommended to use {@link #withRequest(RequestDetails)} instead of this method, for increased visibility.
*/
@Deprecated
@Deprecated(since = "6.10")
<T> T withRequest(
@Nullable RequestDetails theRequestDetails,
@Nullable TransactionDetails theTransactionDetails,