Batch - ensure we target the default partition for non-gated jobs (#5496)

* Target the default partition for batch2 storage.
This commit is contained in:
Michael Buckley 2023-11-27 17:32:58 -05:00 committed by GitHub
parent 4cc4682245
commit 83bfa817ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 47 additions and 11 deletions

View File

@ -0,0 +1,4 @@
---
type: fix
issue: 5496
title: "Ensure batch jobs target the default partition for non-gated steps."

View File

@ -100,6 +100,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
@Override
@Transactional(propagation = Propagation.REQUIRED)
public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
entity.setId(UUID.randomUUID().toString());

View File

@ -48,10 +48,13 @@ public abstract class BaseBatch2Config {
public static final String CHANNEL_NAME = "batch2-work-notification";
@Autowired
private IJobPersistence myPersistence;
IJobPersistence myPersistence;
@Autowired
private IChannelFactory myChannelFactory;
IChannelFactory myChannelFactory;
@Autowired
IHapiTransactionService myHapiTransactionService;
@Bean
public JobDefinitionRegistry batch2JobDefinitionRegistry() {
@ -60,7 +63,7 @@ public abstract class BaseBatch2Config {
@Bean
public WorkChunkProcessor jobStepExecutorService(BatchJobSender theBatchJobSender) {
return new WorkChunkProcessor(myPersistence, theBatchJobSender);
return new WorkChunkProcessor(myPersistence, theBatchJobSender, myHapiTransactionService);
}
@Bean

View File

@ -43,6 +43,7 @@ import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.messaging.MessageHandler;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
@ -144,6 +145,7 @@ public class JobCoordinatorImpl implements IJobCoordinator {
IJobPersistence.CreateResult instanceAndFirstChunk = myTransactionService
.withSystemRequestOnDefaultPartition()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters()));
JobWorkNotification workNotification = JobWorkNotification.firstStepNotification(

View File

@ -28,10 +28,12 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import ca.uhn.fhir.util.Logs;
import org.slf4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -49,13 +51,15 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
private final AtomicInteger myChunkCounter = new AtomicInteger(0);
private final AtomicReference<String> myLastChunkId = new AtomicReference<>();
private final boolean myGatedExecution;
private final IHapiTransactionService myHapiTransactionService;
JobDataSink(
@Nonnull BatchJobSender theBatchJobSender,
@Nonnull IJobPersistence theJobPersistence,
@Nonnull JobDefinition<?> theDefinition,
@Nonnull String theInstanceId,
@Nonnull JobWorkCursor<PT, IT, OT> theJobWorkCursor) {
@Nonnull JobWorkCursor<PT, IT, OT> theJobWorkCursor,
IHapiTransactionService theHapiTransactionService) {
super(theInstanceId, theJobWorkCursor);
myBatchJobSender = theBatchJobSender;
myJobPersistence = theJobPersistence;
@ -63,6 +67,7 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
myJobDefinitionVersion = theDefinition.getJobDefinitionVersion();
myTargetStep = theJobWorkCursor.nextStep;
myGatedExecution = theDefinition.isGatedExecution();
myHapiTransactionService = theHapiTransactionService;
}
@Override
@ -76,7 +81,11 @@ class JobDataSink<PT extends IModelJson, IT extends IModelJson, OT extends IMode
WorkChunkCreateEvent batchWorkChunk = new WorkChunkCreateEvent(
myJobDefinitionId, myJobDefinitionVersion, targetStepId, instanceId, sequence, dataValueString);
String chunkId = myJobPersistence.onWorkChunkCreate(batchWorkChunk);
String chunkId = myHapiTransactionService
.withSystemRequestOnDefaultPartition()
.withPropagation(Propagation.REQUIRES_NEW)
.execute(() -> myJobPersistence.onWorkChunkCreate(batchWorkChunk));
myLastChunkId.set(chunkId);
if (!myGatedExecution) {

View File

@ -29,6 +29,7 @@ import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import org.apache.commons.lang3.Validate;
@ -55,11 +56,16 @@ public class WorkChunkProcessor {
private final IJobPersistence myJobPersistence;
private final BatchJobSender myBatchJobSender;
private final StepExecutor myStepExecutor;
private final IHapiTransactionService myHapiTransactionService;
public WorkChunkProcessor(IJobPersistence theJobPersistence, BatchJobSender theSender) {
public WorkChunkProcessor(
IJobPersistence theJobPersistence,
BatchJobSender theSender,
IHapiTransactionService theHapiTransactionService) {
myJobPersistence = theJobPersistence;
myBatchJobSender = theSender;
myStepExecutor = new StepExecutor(theJobPersistence);
myHapiTransactionService = theHapiTransactionService;
}
/**
@ -118,8 +124,13 @@ public class WorkChunkProcessor {
dataSink = (BaseDataSink<PT, IT, OT>) new FinalStepDataSink<>(
theJobDefinition.getJobDefinitionId(), theInstanceId, theCursor.asFinalCursor());
} else {
dataSink =
new JobDataSink<>(myBatchJobSender, myJobPersistence, theJobDefinition, theInstanceId, theCursor);
dataSink = new JobDataSink<>(
myBatchJobSender,
myJobPersistence,
theJobDefinition,
theInstanceId,
theCursor,
myHapiTransactionService);
}
return dataSink;
}

View File

@ -94,7 +94,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test {
public void beforeEach() {
// The code refactored to keep the same functionality,
// but in this service (so it's a real service here!)
WorkChunkProcessor jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender);
WorkChunkProcessor jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender, new NonTransactionalHapiTransactionService());
mySvc = new JobCoordinatorImpl(myBatchJobSender, myWorkChannelReceiver, myJobInstancePersister, myJobDefinitionRegistry, jobStepExecutorSvc, myJobMaintenanceService, myTransactionService);
}

View File

@ -14,6 +14,8 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.JobWorkNotification;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -31,6 +33,7 @@ import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -54,6 +57,7 @@ class JobDataSinkTest {
private ArgumentCaptor<JobWorkNotification> myJobWorkNotificationCaptor;
@Captor
private ArgumentCaptor<WorkChunkCreateEvent> myBatchWorkChunkCaptor;
private final IHapiTransactionService myHapiTransactionService = new NonTransactionalHapiTransactionService();
@Test
public void test_sink_accept() {
@ -98,7 +102,7 @@ class JobDataSinkTest {
JobInstance instance = JobInstance.fromInstanceId(JOB_INSTANCE_ID);
StepExecutionDetails<TestJobParameters, VoidModel> details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, instance, CHUNK_ID);
JobWorkCursor<TestJobParameters, VoidModel, Step1Output> cursor = new JobWorkCursor<>(job, true, firstStep, lastStep);
JobDataSink<TestJobParameters, VoidModel, Step1Output> sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, job, JOB_INSTANCE_ID, cursor);
JobDataSink<TestJobParameters, VoidModel, Step1Output> sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, job, JOB_INSTANCE_ID, cursor, myHapiTransactionService);
RunOutcome result = firstStepWorker.run(details, sink);
@ -122,6 +126,7 @@ class JobDataSinkTest {
assertEquals(JOB_DEF_ID, batchWorkChunk.jobDefinitionId);
assertEquals(JOB_INSTANCE_ID, batchWorkChunk.instanceId);
assertEquals(LAST_STEP_ID, batchWorkChunk.targetStepId);
assertNotNull(batchWorkChunk.serializedData);
Step1Output stepOutput = JsonUtil.deserialize(batchWorkChunk.serializedData, Step1Output.class);
assertThat(stepOutput.getPids(), hasSize(PID_COUNT));
}

View File

@ -21,6 +21,7 @@ import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
import ca.uhn.fhir.batch2.model.WorkChunkData;
import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.JsonUtil;
import org.junit.jupiter.api.BeforeEach;
@ -428,7 +429,7 @@ public class WorkChunkProcessorTest {
private class TestWorkChunkProcessor extends WorkChunkProcessor {
public TestWorkChunkProcessor(IJobPersistence thePersistence, BatchJobSender theSender) {
super(thePersistence, theSender);
super(thePersistence, theSender, new NonTransactionalHapiTransactionService());
}
@Override