From 83bfa817aebc7f8e176eb975fe31c3e1d9e169f7 Mon Sep 17 00:00:00 2001 From: Michael Buckley Date: Mon, 27 Nov 2023 17:32:58 -0500 Subject: [PATCH] Batch - ensure we target the default partition for non-gated jobs (#5496) * Target the default partition for batch2 storage. --- .../5496-batch-default-partition-non-gated.yaml | 4 ++++ .../fhir/jpa/batch2/JpaJobPersistenceImpl.java | 1 + .../fhir/batch2/config/BaseBatch2Config.java | 9 ++++++--- .../batch2/coordinator/JobCoordinatorImpl.java | 2 ++ .../fhir/batch2/coordinator/JobDataSink.java | 13 +++++++++++-- .../batch2/coordinator/WorkChunkProcessor.java | 17 ++++++++++++++--- .../coordinator/JobCoordinatorImplTest.java | 2 +- .../batch2/coordinator/JobDataSinkTest.java | 7 ++++++- .../coordinator/WorkChunkProcessorTest.java | 3 ++- 9 files changed, 47 insertions(+), 11 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5496-batch-default-partition-non-gated.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5496-batch-default-partition-non-gated.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5496-batch-default-partition-non-gated.yaml new file mode 100644 index 00000000000..0b8253d22a0 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/7_0_0/5496-batch-default-partition-non-gated.yaml @@ -0,0 +1,4 @@ +--- +type: fix +issue: 5496 +title: "Ensure batch jobs target the default partition for non-gated steps." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index a90a88cb4f9..12bf7d12cec 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -100,6 +100,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { } @Override + @Transactional(propagation = Propagation.REQUIRED) public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) { Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity(); entity.setId(UUID.randomUUID().toString()); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index 071a89fe716..4395264fcd5 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -48,10 +48,13 @@ public abstract class BaseBatch2Config { public static final String CHANNEL_NAME = "batch2-work-notification"; @Autowired - private IJobPersistence myPersistence; + IJobPersistence myPersistence; @Autowired - private IChannelFactory myChannelFactory; + IChannelFactory myChannelFactory; + + @Autowired + IHapiTransactionService myHapiTransactionService; @Bean public JobDefinitionRegistry batch2JobDefinitionRegistry() { @@ -60,7 +63,7 @@ public abstract class BaseBatch2Config { @Bean public WorkChunkProcessor jobStepExecutorService(BatchJobSender theBatchJobSender) { - return new WorkChunkProcessor(myPersistence, theBatchJobSender); + return new WorkChunkProcessor(myPersistence, theBatchJobSender, myHapiTransactionService); } @Bean diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java index b51732df3be..a8935f5046a 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImpl.java @@ -43,6 +43,7 @@ import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.springframework.data.domain.Page; import org.springframework.messaging.MessageHandler; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; @@ -144,6 +145,7 @@ public class JobCoordinatorImpl implements IJobCoordinator { IJobPersistence.CreateResult instanceAndFirstChunk = myTransactionService .withSystemRequestOnDefaultPartition() + .withPropagation(Propagation.REQUIRES_NEW) .execute(() -> myJobPersistence.onCreateWithFirstChunk(jobDefinition, theStartRequest.getParameters())); JobWorkNotification workNotification = JobWorkNotification.firstStepNotification( diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java index 26fc1c922c7..5881e349779 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/JobDataSink.java @@ -28,10 +28,12 @@ import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkData; import ca.uhn.fhir.i18n.Msg; +import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.JsonUtil; import ca.uhn.fhir.util.Logs; import org.slf4j.Logger; +import org.springframework.transaction.annotation.Propagation; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -49,13 +51,15 @@ class JobDataSink myLastChunkId = new AtomicReference<>(); private final boolean myGatedExecution; + private final IHapiTransactionService myHapiTransactionService; JobDataSink( @Nonnull BatchJobSender theBatchJobSender, @Nonnull IJobPersistence theJobPersistence, @Nonnull JobDefinition theDefinition, @Nonnull String theInstanceId, - @Nonnull JobWorkCursor theJobWorkCursor) { + @Nonnull JobWorkCursor theJobWorkCursor, + IHapiTransactionService theHapiTransactionService) { super(theInstanceId, theJobWorkCursor); myBatchJobSender = theBatchJobSender; myJobPersistence = theJobPersistence; @@ -63,6 +67,7 @@ class JobDataSink myJobPersistence.onWorkChunkCreate(batchWorkChunk)); + myLastChunkId.set(chunkId); if (!myGatedExecution) { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessor.java index 0c5dd4ba7c9..a68d7dbbcc9 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessor.java @@ -29,6 +29,7 @@ import ca.uhn.fhir.batch2.model.JobDefinitionStep; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.WorkChunk; +import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.Logs; import org.apache.commons.lang3.Validate; @@ -55,11 +56,16 @@ public class WorkChunkProcessor { private final IJobPersistence myJobPersistence; private final BatchJobSender myBatchJobSender; private final StepExecutor myStepExecutor; + private final IHapiTransactionService myHapiTransactionService; - public WorkChunkProcessor(IJobPersistence theJobPersistence, BatchJobSender theSender) { + public WorkChunkProcessor( + IJobPersistence theJobPersistence, + BatchJobSender theSender, + IHapiTransactionService theHapiTransactionService) { myJobPersistence = theJobPersistence; myBatchJobSender = theSender; myStepExecutor = new StepExecutor(theJobPersistence); + myHapiTransactionService = theHapiTransactionService; } /** @@ -118,8 +124,13 @@ public class WorkChunkProcessor { dataSink = (BaseDataSink) new FinalStepDataSink<>( theJobDefinition.getJobDefinitionId(), theInstanceId, theCursor.asFinalCursor()); } else { - dataSink = - new JobDataSink<>(myBatchJobSender, myJobPersistence, theJobDefinition, theInstanceId, theCursor); + dataSink = new JobDataSink<>( + myBatchJobSender, + myJobPersistence, + theJobDefinition, + theInstanceId, + theCursor, + myHapiTransactionService); } return dataSink; } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java index 12ef54af08c..983432c62b0 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobCoordinatorImplTest.java @@ -94,7 +94,7 @@ public class JobCoordinatorImplTest extends BaseBatch2Test { public void beforeEach() { // The code refactored to keep the same functionality, // but in this service (so it's a real service here!) - WorkChunkProcessor jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender); + WorkChunkProcessor jobStepExecutorSvc = new WorkChunkProcessor(myJobInstancePersister, myBatchJobSender, new NonTransactionalHapiTransactionService()); mySvc = new JobCoordinatorImpl(myBatchJobSender, myWorkChannelReceiver, myJobInstancePersister, myJobDefinitionRegistry, jobStepExecutorSvc, myJobMaintenanceService, myTransactionService); } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java index 1fb4274bcbe..a7e064f9e68 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/JobDataSinkTest.java @@ -14,6 +14,8 @@ import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobWorkCursor; import ca.uhn.fhir.batch2.model.JobWorkNotification; import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; +import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; +import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.JsonUtil; import com.fasterxml.jackson.annotation.JsonProperty; @@ -31,6 +33,7 @@ import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -54,6 +57,7 @@ class JobDataSinkTest { private ArgumentCaptor myJobWorkNotificationCaptor; @Captor private ArgumentCaptor myBatchWorkChunkCaptor; + private final IHapiTransactionService myHapiTransactionService = new NonTransactionalHapiTransactionService(); @Test public void test_sink_accept() { @@ -98,7 +102,7 @@ class JobDataSinkTest { JobInstance instance = JobInstance.fromInstanceId(JOB_INSTANCE_ID); StepExecutionDetails details = new StepExecutionDetails<>(new TestJobParameters().setParam1("" + PID_COUNT), null, instance, CHUNK_ID); JobWorkCursor cursor = new JobWorkCursor<>(job, true, firstStep, lastStep); - JobDataSink sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, job, JOB_INSTANCE_ID, cursor); + JobDataSink sink = new JobDataSink<>(myBatchJobSender, myJobPersistence, job, JOB_INSTANCE_ID, cursor, myHapiTransactionService); RunOutcome result = firstStepWorker.run(details, sink); @@ -122,6 +126,7 @@ class JobDataSinkTest { assertEquals(JOB_DEF_ID, batchWorkChunk.jobDefinitionId); assertEquals(JOB_INSTANCE_ID, batchWorkChunk.instanceId); assertEquals(LAST_STEP_ID, batchWorkChunk.targetStepId); + assertNotNull(batchWorkChunk.serializedData); Step1Output stepOutput = JsonUtil.deserialize(batchWorkChunk.serializedData, Step1Output.class); assertThat(stepOutput.getPids(), hasSize(PID_COUNT)); } diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessorTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessorTest.java index 83cc133cb95..10f1a007e5c 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessorTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/coordinator/WorkChunkProcessorTest.java @@ -21,6 +21,7 @@ import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent; import ca.uhn.fhir.batch2.model.WorkChunkData; import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; +import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.JsonUtil; import org.junit.jupiter.api.BeforeEach; @@ -428,7 +429,7 @@ public class WorkChunkProcessorTest { private class TestWorkChunkProcessor extends WorkChunkProcessor { public TestWorkChunkProcessor(IJobPersistence thePersistence, BatchJobSender theSender) { - super(thePersistence, theSender); + super(thePersistence, theSender, new NonTransactionalHapiTransactionService()); } @Override