diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java index b1ae465ebb8..16c7a6bac9e 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2CoordinatorIT.java @@ -249,16 +249,10 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { AtomicBoolean completionBool = new AtomicBoolean(); - AtomicBoolean jobStatusBool = new AtomicBoolean(); - myCompletionHandler = (params) -> { - // ensure our completion handler fires + // ensure our completion handler gets the right status assertEquals(StatusEnum.COMPLETED, params.getInstance().getStatus()); completionBool.getAndSet(true); - - if (StatusEnum.COMPLETED.equals(params.getInstance().getStatus())){ - jobStatusBool.getAndSet(true); - } }; buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() { @@ -314,17 +308,16 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test { // waiting myBatch2JobHelper.awaitJobCompletion(instanceId); - myLastStepLatch.awaitExpected(); ourLog.info("awaited the last step"); + myLastStepLatch.awaitExpected(); // verify Optional instanceOp = myJobPersistence.fetchInstance(instanceId); assertTrue(instanceOp.isPresent()); JobInstance jobInstance = instanceOp.get(); - // ensure our completion handler fires with the up-to-date job instance + // ensure our completion handler fired assertTrue(completionBool.get()); - assertTrue(jobStatusBool.get()); assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus()); assertEquals(1.0, jobInstance.getProgress()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java index eae9ea6398c..9cd1fb4fb60 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/Batch2JobMaintenanceIT.java @@ -89,6 +89,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test { @BeforeEach public void before() { + myStorageSettings.setJobFastTrackingEnabled(true); myCompletionHandler = details -> {}; myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings()); JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService; @@ -101,7 +102,6 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test { @AfterEach public void after() { myWorkChannel.clearInterceptorsForUnitTest(); - myStorageSettings.setJobFastTrackingEnabled(true); JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService; jobMaintenanceService.setMaintenanceJobStartedCallback(() -> {}); } diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java index d90129dea19..22db9ae69ef 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/AbstractIJobPersistenceSpecificationTest.java @@ -27,7 +27,6 @@ import ca.uhn.fhir.batch2.model.JobDefinition; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; -import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.util.StopWatch; import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; @@ -60,10 +59,10 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IInPro private JobDefinitionRegistry myJobDefinitionRegistry; @Autowired - private IHapiTransactionService myTransactionService; + private PlatformTransactionManager myTransactionManager; - public IHapiTransactionService getTransactionManager() { - return myTransactionService; + public PlatformTransactionManager getTransactionManager() { + return myTransactionManager; } public IJobPersistence getSvc() { diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java index 163c1bb7bb3..fb29b36f7e0 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkCommon.java @@ -6,6 +6,7 @@ import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; +import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.support.TransactionTemplate; public interface IWorkChunkCommon extends WorkChunkTestConstants { @@ -31,7 +32,7 @@ public interface IWorkChunkCommon extends WorkChunkTestConstants { void runMaintenancePass(); - IHapiTransactionService getTransactionManager(); + PlatformTransactionManager getTransactionManager(); IJobPersistence getSvc(); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java index bc2ae1f0d7d..2b180b3a862 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/config/BaseBatch2Config.java @@ -41,6 +41,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; @Configuration public abstract class BaseBatch2Config { @@ -104,7 +105,7 @@ public abstract class BaseBatch2Config { BatchJobSender theBatchJobSender, WorkChunkProcessor theExecutor, IReductionStepExecutorService theReductionStepExecutorService, - IHapiTransactionService theTransactionService) { + PlatformTransactionManager theTransactionManager) { return new JobMaintenanceServiceImpl( theSchedulerService, myPersistence, @@ -113,7 +114,7 @@ public abstract class BaseBatch2Config { theBatchJobSender, theExecutor, theReductionStepExecutorService, - theTransactionService); + theTransactionManager); } @Bean diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index ebfaafedb01..961386aecf3 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -39,7 +39,10 @@ import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.StopWatch; import org.apache.commons.lang3.time.DateUtils; import org.slf4j.Logger; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.support.DefaultTransactionDefinition; import java.util.List; import java.util.Optional; @@ -59,7 +62,7 @@ public class JobInstanceProcessor { private final String myInstanceId; private final JobDefinitionRegistry myJobDefinitionegistry; - private final IHapiTransactionService myTransactionService; + private final PlatformTransactionManager myTransactionManager; public JobInstanceProcessor( IJobPersistence theJobPersistence, @@ -68,7 +71,7 @@ public class JobInstanceProcessor { JobChunkProgressAccumulator theProgressAccumulator, IReductionStepExecutorService theReductionStepExecutorService, JobDefinitionRegistry theJobDefinitionRegistry, - IHapiTransactionService theTransactionService) { + PlatformTransactionManager theTransactionManager) { myJobPersistence = theJobPersistence; myBatchJobSender = theBatchJobSender; myInstanceId = theInstanceId; @@ -79,7 +82,7 @@ public class JobInstanceProcessor { new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry); myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); - myTransactionService = theTransactionService; + myTransactionManager = theTransactionManager; } public void process() { @@ -199,18 +202,19 @@ public class JobInstanceProcessor { String currentStepId = jobWorkCursor.getCurrentStepId(); boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, currentStepId); if (canAdvance) { - String nextStepId = jobWorkCursor.nextStep.getStepId(); - ourLog.info( + // current step is the reduction step (a final step) + if (jobWorkCursor.isReductionStep()) { + JobWorkCursor nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( + jobWorkCursor.getJobDefinition(), jobWorkCursor.getCurrentStepId()); + myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor); + } else { + String nextStepId = jobWorkCursor.nextStep.getStepId(); + ourLog.info( "All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", instanceId, currentStepId, nextStepId); - if (jobWorkCursor.nextStep.isReductionStep()) { - JobWorkCursor nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId( - jobWorkCursor.getJobDefinition(), jobWorkCursor.nextStep.getStepId()); - myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor); - } else { // otherwise, continue processing as expected processChunksForNextSteps(theInstance, nextStepId); } @@ -268,33 +272,35 @@ public class JobInstanceProcessor { // we need a transaction to access the stream of workchunks // because workchunks are created in READY state, there's an unknown // number of them (and so we could be reading many from the db) - getTxBuilder().withPropagation(Propagation.REQUIRES_NEW).execute(() -> { - Stream readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates( - theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY)); - readyChunks.forEach(chunk -> { - JobWorkCursor jobWorkCursor = - JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, chunk.getTargetStepId()); - if (theJobDefinition.isGatedExecution() - && jobWorkCursor.isFinalStep() - && jobWorkCursor.isReductionStep()) { - // reduction steps are processed by - // ReductionStepExecutorServiceImpl - // which does not wait for steps off the queue. - // so we will not process them here - return; - } + TransactionStatus status = myTransactionManager.getTransaction(new DefaultTransactionDefinition()); + Stream readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates( + theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY)); - /* - * For each chunk id - * * Move to QUEUE'd - * * Send to topic - * * flush changes - * * commit - */ - updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition); - }); + readyChunks.forEach(chunk -> { + JobWorkCursor jobWorkCursor = + JobWorkCursor.fromJobDefinitionAndRequestedStepId(theJobDefinition, chunk.getTargetStepId()); + if (theJobDefinition.isGatedExecution() + && jobWorkCursor.isFinalStep() + && jobWorkCursor.isReductionStep()) { + // reduction steps are processed by + // ReductionStepExecutorServiceImpl + // which does not wait for steps off the queue. + // so we will not process them here + return; + } + + /* + * For each chunk id + * * Move to QUEUE'd + * * Send to topic + * * flush changes + * * commit + */ + updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition); }); + + myTransactionManager.commit(status); } /** @@ -332,10 +338,6 @@ public class JobInstanceProcessor { }); } - private IHapiTransactionService.IExecutionBuilder getTxBuilder() { - return myTransactionService.withSystemRequest().withRequestPartitionId(RequestPartitionId.allPartitions()); - } - private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) { String instanceId = theInstance.getInstanceId(); List queuedChunksForNextStep = diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java index 48a1bfb7571..a85906b34c9 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImpl.java @@ -28,7 +28,6 @@ import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; -import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; @@ -41,6 +40,7 @@ import org.apache.commons.lang3.time.DateUtils; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.PlatformTransactionManager; import java.util.HashSet; import java.util.List; @@ -90,7 +90,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc private final JobDefinitionRegistry myJobDefinitionRegistry; private final BatchJobSender myBatchJobSender; private final WorkChunkProcessor myJobExecutorSvc; - private final IHapiTransactionService myTransactionService; + private final PlatformTransactionManager myTransactionManager; private final Semaphore myRunMaintenanceSemaphore = new Semaphore(1); @@ -110,7 +110,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc @Nonnull BatchJobSender theBatchJobSender, @Nonnull WorkChunkProcessor theExecutor, @Nonnull IReductionStepExecutorService theReductionStepExecutorService, - IHapiTransactionService theTransactionService) { + PlatformTransactionManager theTransactionService) { myStorageSettings = theStorageSettings; myReductionStepExecutorService = theReductionStepExecutorService; Validate.notNull(theSchedulerService); @@ -123,7 +123,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc myJobDefinitionRegistry = theJobDefinitionRegistry; myBatchJobSender = theBatchJobSender; myJobExecutorSvc = theExecutor; - myTransactionService = theTransactionService; + myTransactionManager = theTransactionService; } @Override @@ -237,7 +237,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc progressAccumulator, myReductionStepExecutorService, myJobDefinitionRegistry, - myTransactionService); + myTransactionManager); ourLog.debug( "Triggering maintenance process for instance {} in status {}", instanceId, diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java index 790ed970c1a..b284f16cb96 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/InstanceProgress.java @@ -73,6 +73,7 @@ public class InstanceProgress { statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1); switch (theChunk.getStatus()) { + case READY: case QUEUED: case IN_PROGRESS: myIncompleteChunkCount++; diff --git a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java index 0ad83b2a4f9..44d8abbd243 100644 --- a/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java +++ b/hapi-fhir-storage-batch2/src/test/java/ca/uhn/fhir/batch2/maintenance/JobMaintenanceServiceImplTest.java @@ -17,8 +17,6 @@ import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; -import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; -import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.test.util.LogbackCaptureTestExtension; @@ -26,8 +24,6 @@ import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.spi.ILoggingEvent; import com.google.common.collect.Lists; -import jakarta.persistence.EntityManager; -import org.hibernate.Session; import org.hl7.fhir.r4.model.DateTimeType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -40,7 +36,7 @@ import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; -import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.PlatformTransactionManager; import java.util.Arrays; import java.util.Collections; @@ -54,6 +50,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep1; import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep2; @@ -70,8 +67,6 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -81,21 +76,6 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class JobMaintenanceServiceImplTest extends BaseBatch2Test { - private static class TestHapiTransactionservice extends HapiTransactionService { - - @Override - protected T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback theCallback) { - return overrideExecute(theCallback); - } - - /** - * Override method for testing purposes (if needed) - */ - public T overrideExecute(TransactionCallback theCallback) { - return null; - } - } - @RegisterExtension LogbackCaptureTestExtension myLogCapture = new LogbackCaptureTestExtension((Logger) LoggerFactory.getLogger("ca.uhn.fhir.log.batch_troubleshooting"), Level.WARN); @Mock @@ -112,8 +92,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { private JobDefinitionRegistry myJobDefinitionRegistry; @Mock private IChannelProducer myWorkChannelProducer; - @Spy - private IHapiTransactionService myTransactionService = new TestHapiTransactionservice(); + @Mock + private PlatformTransactionManager myTransactionService; @Captor private ArgumentCaptor> myMessageCaptor; @Captor @@ -192,6 +172,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false))) .thenReturn(chunks.iterator()); + when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) + .thenReturn(Stream.empty()); stubUpdateInstanceCallback(instance); mySvc.runMaintenancePass(); @@ -234,6 +216,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false))) .thenReturn(chunks.iterator()); + when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) + .thenReturn(Stream.empty()); stubUpdateInstanceCallback(instance); // Execute @@ -256,22 +240,27 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { // Setup List chunks = Arrays.asList( JobCoordinatorImplTest.createWorkChunkStep1().setStatus(WorkChunkStatusEnum.COMPLETED).setId(CHUNK_ID + "abc"), - JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID), - JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID_2) + JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY).setId(CHUNK_ID), + JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY).setId(CHUNK_ID_2) ); - when (myJobPersistence.canAdvanceInstanceToNextStep(any(), any())).thenReturn(true); myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false))) .thenReturn(chunks.iterator()); - - when(myJobPersistence.fetchAllChunkIdsForStepWithStatus(eq(INSTANCE_ID), eq(STEP_2), eq(WorkChunkStatusEnum.QUEUED))) - .thenReturn(chunks.stream().filter(c->c.getTargetStepId().equals(STEP_2)).map(WorkChunk::getId).collect(Collectors.toList())); + when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(anyString(), anyString())) + .thenReturn(chunks.stream().map(WorkChunk::getStatus).collect(Collectors.toSet())); JobInstance instance1 = createInstance(); instance1.setCurrentGatedStepId(STEP_1); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1)); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance1)); + when(myJobPersistence.fetchAllWorkChunksForJobInStates(anyString(), eq(Set.of(WorkChunkStatusEnum.READY)))) + .thenReturn(chunks.stream().filter(c -> c.getStatus() == WorkChunkStatusEnum.READY)); + doAnswer(a -> { + Consumer callback = a.getArgument(1); + callback.accept(1); + return null; + }).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any()); stubUpdateInstanceCallback(instance1); // Execute @@ -279,7 +268,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { // Verify verify(myWorkChannelProducer, times(2)).send(myMessageCaptor.capture()); - verify(myJobPersistence, times(2)).updateInstance(eq(INSTANCE_ID), any()); + verify(myJobPersistence, times(1)).updateInstance(eq(INSTANCE_ID), any()); verifyNoMoreInteractions(myJobPersistence); JobWorkNotification payload0 = myMessageCaptor.getAllValues().get(0).getPayload(); assertEquals(STEP_2, payload0.getTargetStepId()); @@ -297,6 +286,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { instance.setEndTime(parseTime("2001-01-01T12:12:12Z")); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance)); + when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) + .thenReturn(Stream.empty()); mySvc.runMaintenancePass(); @@ -320,6 +311,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator()); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance)); + when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY)))) + .thenReturn(Stream.empty()); stubUpdateInstanceCallback(instance); // Execute @@ -362,6 +355,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())) .thenAnswer(t->chunks.iterator()); + when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY)))) + .thenReturn(Stream.empty()); stubUpdateInstanceCallback(instance); mySvc.runMaintenancePass(); @@ -383,10 +378,12 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { private void runEnqueueReadyChunksTest(List theChunks, JobDefinition theJobDefinition) { myJobDefinitionRegistry.addJobDefinition(theJobDefinition); JobInstance instance = createInstance(); + // we'll set the instance to the first step id + theChunks.stream().findFirst().ifPresent(c -> { + instance.setCurrentGatedStepId(c.getTargetStepId()); + }); instance.setJobDefinitionId(theJobDefinition.getJobDefinitionId()); - Session sessionContract = mock(Session.class); - // mocks when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance)); @@ -394,20 +391,13 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { .thenAnswer(t -> theChunks.stream()); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())) .thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.QUEUED)).toList().iterator()); - // we just need it to fire, so we'll fire it manually - when(((TestHapiTransactionservice)myTransactionService).overrideExecute(any())) - .thenAnswer(args -> { - TransactionCallback callback = args.getArgument(0); - callback.doInTransaction(null); - return null; - }); // test mySvc.runMaintenancePass(); } @Test - public void testMaintenancePass_withREADYworkChunksForReductionSteps_movedToQueueButNotPublished() { + public void testMaintenancePass_withREADYworkChunksForReductionSteps_notQueuedButProcessed() { // setup List chunks = List.of( createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY), @@ -415,19 +405,17 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { ); // when - doAnswer(args -> { - Consumer consumer = args.getArgument(1); - consumer.accept(1); - return 1; - }).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any()); + when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(anyString(), anyString())) + .thenReturn(chunks.stream().map(WorkChunk::getStatus).collect(Collectors.toSet())); // test runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction()); - // verify - // saved, but not sent to the queue - verify(myJobPersistence, times(2)).enqueueWorkChunkForProcessing(anyString(), any()); + // verify never updated (should remain in ready state) + verify(myJobPersistence, never()).enqueueWorkChunkForProcessing(anyString(), any()); verify(myWorkChannelProducer, never()).send(any()); + verify(myReductionStepExecutorService) + .triggerReductionStep(anyString(), any()); } @Test @@ -445,7 +433,6 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { return 1; }).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any()); - // test runEnqueueReadyChunksTest(chunks, createJobDefinition()); @@ -464,8 +451,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test { public void testMaintenancePass_whenUpdateFails_skipsWorkChunkAndLogs() { // setup List chunks = List.of( - createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY), - createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY) + createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY), + createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY) ); myLogCapture.setUp(Level.ERROR);