review points continued

This commit is contained in:
leif stawnyczy 2024-03-08 16:57:32 -05:00
parent 5f5ea1c49e
commit b3dcc17ccb
9 changed files with 97 additions and 113 deletions

View File

@ -249,16 +249,10 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
AtomicBoolean completionBool = new AtomicBoolean(); AtomicBoolean completionBool = new AtomicBoolean();
AtomicBoolean jobStatusBool = new AtomicBoolean();
myCompletionHandler = (params) -> { myCompletionHandler = (params) -> {
// ensure our completion handler fires // ensure our completion handler gets the right status
assertEquals(StatusEnum.COMPLETED, params.getInstance().getStatus()); assertEquals(StatusEnum.COMPLETED, params.getInstance().getStatus());
completionBool.getAndSet(true); completionBool.getAndSet(true);
if (StatusEnum.COMPLETED.equals(params.getInstance().getStatus())){
jobStatusBool.getAndSet(true);
}
}; };
buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() { buildAndDefine3StepReductionJob(jobId, new IReductionStepHandler() {
@ -314,17 +308,16 @@ public class Batch2CoordinatorIT extends BaseJpaR4Test {
// waiting // waiting
myBatch2JobHelper.awaitJobCompletion(instanceId); myBatch2JobHelper.awaitJobCompletion(instanceId);
myLastStepLatch.awaitExpected();
ourLog.info("awaited the last step"); ourLog.info("awaited the last step");
myLastStepLatch.awaitExpected();
// verify // verify
Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId); Optional<JobInstance> instanceOp = myJobPersistence.fetchInstance(instanceId);
assertTrue(instanceOp.isPresent()); assertTrue(instanceOp.isPresent());
JobInstance jobInstance = instanceOp.get(); JobInstance jobInstance = instanceOp.get();
// ensure our completion handler fires with the up-to-date job instance // ensure our completion handler fired
assertTrue(completionBool.get()); assertTrue(completionBool.get());
assertTrue(jobStatusBool.get());
assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus()); assertEquals(StatusEnum.COMPLETED, jobInstance.getStatus());
assertEquals(1.0, jobInstance.getProgress()); assertEquals(1.0, jobInstance.getProgress());

View File

@ -89,6 +89,7 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
@BeforeEach @BeforeEach
public void before() { public void before() {
myStorageSettings.setJobFastTrackingEnabled(true);
myCompletionHandler = details -> {}; myCompletionHandler = details -> {};
myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings()); myWorkChannel = (LinkedBlockingChannel) myChannelFactory.getOrCreateReceiver(CHANNEL_NAME, JobWorkNotificationJsonMessage.class, new ChannelConsumerSettings());
JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService; JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService;
@ -101,7 +102,6 @@ public class Batch2JobMaintenanceIT extends BaseJpaR4Test {
@AfterEach @AfterEach
public void after() { public void after() {
myWorkChannel.clearInterceptorsForUnitTest(); myWorkChannel.clearInterceptorsForUnitTest();
myStorageSettings.setJobFastTrackingEnabled(true);
JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService; JobMaintenanceServiceImpl jobMaintenanceService = (JobMaintenanceServiceImpl) myJobMaintenanceService;
jobMaintenanceService.setMaintenanceJobStartedCallback(() -> {}); jobMaintenanceService.setMaintenanceJobStartedCallback(() -> {});
} }

View File

@ -27,7 +27,6 @@ import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent; import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
@ -60,10 +59,10 @@ public abstract class AbstractIJobPersistenceSpecificationTest implements IInPro
private JobDefinitionRegistry myJobDefinitionRegistry; private JobDefinitionRegistry myJobDefinitionRegistry;
@Autowired @Autowired
private IHapiTransactionService myTransactionService; private PlatformTransactionManager myTransactionManager;
public IHapiTransactionService getTransactionManager() { public PlatformTransactionManager getTransactionManager() {
return myTransactionService; return myTransactionManager;
} }
public IJobPersistence getSvc() { public IJobPersistence getSvc() {

View File

@ -6,6 +6,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService; import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters; import ca.uhn.hapi.fhir.batch2.test.support.TestJobParameters;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate; import org.springframework.transaction.support.TransactionTemplate;
public interface IWorkChunkCommon extends WorkChunkTestConstants { public interface IWorkChunkCommon extends WorkChunkTestConstants {
@ -31,7 +32,7 @@ public interface IWorkChunkCommon extends WorkChunkTestConstants {
void runMaintenancePass(); void runMaintenancePass();
IHapiTransactionService getTransactionManager(); PlatformTransactionManager getTransactionManager();
IJobPersistence getSvc(); IJobPersistence getSvc();

View File

@ -41,6 +41,7 @@ import ca.uhn.fhir.jpa.subscription.channel.api.IChannelReceiver;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration @Configuration
public abstract class BaseBatch2Config { public abstract class BaseBatch2Config {
@ -104,7 +105,7 @@ public abstract class BaseBatch2Config {
BatchJobSender theBatchJobSender, BatchJobSender theBatchJobSender,
WorkChunkProcessor theExecutor, WorkChunkProcessor theExecutor,
IReductionStepExecutorService theReductionStepExecutorService, IReductionStepExecutorService theReductionStepExecutorService,
IHapiTransactionService theTransactionService) { PlatformTransactionManager theTransactionManager) {
return new JobMaintenanceServiceImpl( return new JobMaintenanceServiceImpl(
theSchedulerService, theSchedulerService,
myPersistence, myPersistence,
@ -113,7 +114,7 @@ public abstract class BaseBatch2Config {
theBatchJobSender, theBatchJobSender,
theExecutor, theExecutor,
theReductionStepExecutorService, theReductionStepExecutorService,
theTransactionService); theTransactionManager);
} }
@Bean @Bean

View File

@ -39,7 +39,10 @@ import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch; import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -59,7 +62,7 @@ public class JobInstanceProcessor {
private final String myInstanceId; private final String myInstanceId;
private final JobDefinitionRegistry myJobDefinitionegistry; private final JobDefinitionRegistry myJobDefinitionegistry;
private final IHapiTransactionService myTransactionService; private final PlatformTransactionManager myTransactionManager;
public JobInstanceProcessor( public JobInstanceProcessor(
IJobPersistence theJobPersistence, IJobPersistence theJobPersistence,
@ -68,7 +71,7 @@ public class JobInstanceProcessor {
JobChunkProgressAccumulator theProgressAccumulator, JobChunkProgressAccumulator theProgressAccumulator,
IReductionStepExecutorService theReductionStepExecutorService, IReductionStepExecutorService theReductionStepExecutorService,
JobDefinitionRegistry theJobDefinitionRegistry, JobDefinitionRegistry theJobDefinitionRegistry,
IHapiTransactionService theTransactionService) { PlatformTransactionManager theTransactionManager) {
myJobPersistence = theJobPersistence; myJobPersistence = theJobPersistence;
myBatchJobSender = theBatchJobSender; myBatchJobSender = theBatchJobSender;
myInstanceId = theInstanceId; myInstanceId = theInstanceId;
@ -79,7 +82,7 @@ public class JobInstanceProcessor {
new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry); new JobInstanceProgressCalculator(theJobPersistence, theProgressAccumulator, theJobDefinitionRegistry);
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry); myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
myTransactionService = theTransactionService; myTransactionManager = theTransactionManager;
} }
public void process() { public void process() {
@ -199,6 +202,12 @@ public class JobInstanceProcessor {
String currentStepId = jobWorkCursor.getCurrentStepId(); String currentStepId = jobWorkCursor.getCurrentStepId();
boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, currentStepId); boolean canAdvance = canAdvanceGatedJob(theJobDefinition, theInstance, currentStepId);
if (canAdvance) { if (canAdvance) {
// current step is the reduction step (a final step)
if (jobWorkCursor.isReductionStep()) {
JobWorkCursor<?, ?, ?> nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobWorkCursor.getJobDefinition(), jobWorkCursor.getCurrentStepId());
myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
} else {
String nextStepId = jobWorkCursor.nextStep.getStepId(); String nextStepId = jobWorkCursor.nextStep.getStepId();
ourLog.info( ourLog.info(
"All processing is complete for gated execution of instance {} step {}. Proceeding to step {}", "All processing is complete for gated execution of instance {} step {}. Proceeding to step {}",
@ -206,11 +215,6 @@ public class JobInstanceProcessor {
currentStepId, currentStepId,
nextStepId); nextStepId);
if (jobWorkCursor.nextStep.isReductionStep()) {
JobWorkCursor<?, ?, ?> nextJobWorkCursor = JobWorkCursor.fromJobDefinitionAndRequestedStepId(
jobWorkCursor.getJobDefinition(), jobWorkCursor.nextStep.getStepId());
myReductionStepExecutorService.triggerReductionStep(instanceId, nextJobWorkCursor);
} else {
// otherwise, continue processing as expected // otherwise, continue processing as expected
processChunksForNextSteps(theInstance, nextStepId); processChunksForNextSteps(theInstance, nextStepId);
} }
@ -268,7 +272,8 @@ public class JobInstanceProcessor {
// we need a transaction to access the stream of workchunks // we need a transaction to access the stream of workchunks
// because workchunks are created in READY state, there's an unknown // because workchunks are created in READY state, there's an unknown
// number of them (and so we could be reading many from the db) // number of them (and so we could be reading many from the db)
getTxBuilder().withPropagation(Propagation.REQUIRES_NEW).execute(() -> {
TransactionStatus status = myTransactionManager.getTransaction(new DefaultTransactionDefinition());
Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates( Stream<WorkChunk> readyChunks = myJobPersistence.fetchAllWorkChunksForJobInStates(
theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY)); theJobInstance.getInstanceId(), Set.of(WorkChunkStatusEnum.READY));
@ -294,7 +299,8 @@ public class JobInstanceProcessor {
*/ */
updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition); updateChunkAndSendToQueue(chunk, theJobInstance, theJobDefinition);
}); });
});
myTransactionManager.commit(status);
} }
/** /**
@ -332,10 +338,6 @@ public class JobInstanceProcessor {
}); });
} }
private IHapiTransactionService.IExecutionBuilder getTxBuilder() {
return myTransactionService.withSystemRequest().withRequestPartitionId(RequestPartitionId.allPartitions());
}
private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) { private void processChunksForNextSteps(JobInstance theInstance, String nextStepId) {
String instanceId = theInstance.getInstanceId(); String instanceId = theInstance.getInstanceId();
List<String> queuedChunksForNextStep = List<String> queuedChunksForNextStep =

View File

@ -28,7 +28,6 @@ import ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor;
import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.i18n.Msg; import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.HapiJob; import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs; import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
@ -41,6 +40,7 @@ import org.apache.commons.lang3.time.DateUtils;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -90,7 +90,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
private final JobDefinitionRegistry myJobDefinitionRegistry; private final JobDefinitionRegistry myJobDefinitionRegistry;
private final BatchJobSender myBatchJobSender; private final BatchJobSender myBatchJobSender;
private final WorkChunkProcessor myJobExecutorSvc; private final WorkChunkProcessor myJobExecutorSvc;
private final IHapiTransactionService myTransactionService; private final PlatformTransactionManager myTransactionManager;
private final Semaphore myRunMaintenanceSemaphore = new Semaphore(1); private final Semaphore myRunMaintenanceSemaphore = new Semaphore(1);
@ -110,7 +110,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
@Nonnull BatchJobSender theBatchJobSender, @Nonnull BatchJobSender theBatchJobSender,
@Nonnull WorkChunkProcessor theExecutor, @Nonnull WorkChunkProcessor theExecutor,
@Nonnull IReductionStepExecutorService theReductionStepExecutorService, @Nonnull IReductionStepExecutorService theReductionStepExecutorService,
IHapiTransactionService theTransactionService) { PlatformTransactionManager theTransactionService) {
myStorageSettings = theStorageSettings; myStorageSettings = theStorageSettings;
myReductionStepExecutorService = theReductionStepExecutorService; myReductionStepExecutorService = theReductionStepExecutorService;
Validate.notNull(theSchedulerService); Validate.notNull(theSchedulerService);
@ -123,7 +123,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
myJobDefinitionRegistry = theJobDefinitionRegistry; myJobDefinitionRegistry = theJobDefinitionRegistry;
myBatchJobSender = theBatchJobSender; myBatchJobSender = theBatchJobSender;
myJobExecutorSvc = theExecutor; myJobExecutorSvc = theExecutor;
myTransactionService = theTransactionService; myTransactionManager = theTransactionService;
} }
@Override @Override
@ -237,7 +237,7 @@ public class JobMaintenanceServiceImpl implements IJobMaintenanceService, IHasSc
progressAccumulator, progressAccumulator,
myReductionStepExecutorService, myReductionStepExecutorService,
myJobDefinitionRegistry, myJobDefinitionRegistry,
myTransactionService); myTransactionManager);
ourLog.debug( ourLog.debug(
"Triggering maintenance process for instance {} in status {}", "Triggering maintenance process for instance {} in status {}",
instanceId, instanceId,

View File

@ -73,6 +73,7 @@ public class InstanceProgress {
statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1); statusToCountMap.put(theChunk.getStatus(), statusToCountMap.getOrDefault(theChunk.getStatus(), 0) + 1);
switch (theChunk.getStatus()) { switch (theChunk.getStatus()) {
case READY:
case QUEUED: case QUEUED:
case IN_PROGRESS: case IN_PROGRESS:
myIncompleteChunkCount++; myIncompleteChunkCount++;

View File

@ -17,8 +17,6 @@ import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum; import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; import ca.uhn.fhir.jpa.api.config.JpaStorageSettings;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.ISchedulerService; import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer; import ca.uhn.fhir.jpa.subscription.channel.api.IChannelProducer;
import ca.uhn.test.util.LogbackCaptureTestExtension; import ca.uhn.test.util.LogbackCaptureTestExtension;
@ -26,8 +24,6 @@ import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.ILoggingEvent;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import jakarta.persistence.EntityManager;
import org.hibernate.Session;
import org.hl7.fhir.r4.model.DateTimeType; import org.hl7.fhir.r4.model.DateTimeType;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -40,7 +36,7 @@ import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.PlatformTransactionManager;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -54,6 +50,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep1; import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep1;
import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep2; import static ca.uhn.fhir.batch2.coordinator.JobCoordinatorImplTest.createWorkChunkStep2;
@ -70,8 +67,6 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -81,21 +76,6 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class JobMaintenanceServiceImplTest extends BaseBatch2Test { public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private static class TestHapiTransactionservice extends HapiTransactionService {
@Override
protected <T> T doExecute(ExecutionBuilder theExecutionBuilder, TransactionCallback<T> theCallback) {
return overrideExecute(theCallback);
}
/**
* Override method for testing purposes (if needed)
*/
public <T> T overrideExecute(TransactionCallback<T> theCallback) {
return null;
}
}
@RegisterExtension @RegisterExtension
LogbackCaptureTestExtension myLogCapture = new LogbackCaptureTestExtension((Logger) LoggerFactory.getLogger("ca.uhn.fhir.log.batch_troubleshooting"), Level.WARN); LogbackCaptureTestExtension myLogCapture = new LogbackCaptureTestExtension((Logger) LoggerFactory.getLogger("ca.uhn.fhir.log.batch_troubleshooting"), Level.WARN);
@Mock @Mock
@ -112,8 +92,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private JobDefinitionRegistry myJobDefinitionRegistry; private JobDefinitionRegistry myJobDefinitionRegistry;
@Mock @Mock
private IChannelProducer myWorkChannelProducer; private IChannelProducer myWorkChannelProducer;
@Spy @Mock
private IHapiTransactionService myTransactionService = new TestHapiTransactionservice(); private PlatformTransactionManager myTransactionService;
@Captor @Captor
private ArgumentCaptor<Message<JobWorkNotification>> myMessageCaptor; private ArgumentCaptor<Message<JobWorkNotification>> myMessageCaptor;
@Captor @Captor
@ -192,6 +172,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false))) when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator()); .thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
stubUpdateInstanceCallback(instance); stubUpdateInstanceCallback(instance);
mySvc.runMaintenancePass(); mySvc.runMaintenancePass();
@ -234,6 +216,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false))) when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator()); .thenReturn(chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
stubUpdateInstanceCallback(instance); stubUpdateInstanceCallback(instance);
// Execute // Execute
@ -256,22 +240,27 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
// Setup // Setup
List<WorkChunk> chunks = Arrays.asList( List<WorkChunk> chunks = Arrays.asList(
JobCoordinatorImplTest.createWorkChunkStep1().setStatus(WorkChunkStatusEnum.COMPLETED).setId(CHUNK_ID + "abc"), JobCoordinatorImplTest.createWorkChunkStep1().setStatus(WorkChunkStatusEnum.COMPLETED).setId(CHUNK_ID + "abc"),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID), JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY).setId(CHUNK_ID),
JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.QUEUED).setId(CHUNK_ID_2) JobCoordinatorImplTest.createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY).setId(CHUNK_ID_2)
); );
when (myJobPersistence.canAdvanceInstanceToNextStep(any(), any())).thenReturn(true);
myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution)); myJobDefinitionRegistry.addJobDefinition(createJobDefinition(JobDefinition.Builder::gatedExecution));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false))) when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), eq(false)))
.thenReturn(chunks.iterator()); .thenReturn(chunks.iterator());
when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(anyString(), anyString()))
when(myJobPersistence.fetchAllChunkIdsForStepWithStatus(eq(INSTANCE_ID), eq(STEP_2), eq(WorkChunkStatusEnum.QUEUED))) .thenReturn(chunks.stream().map(WorkChunk::getStatus).collect(Collectors.toSet()));
.thenReturn(chunks.stream().filter(c->c.getTargetStepId().equals(STEP_2)).map(WorkChunk::getId).collect(Collectors.toList()));
JobInstance instance1 = createInstance(); JobInstance instance1 = createInstance();
instance1.setCurrentGatedStepId(STEP_1); instance1.setCurrentGatedStepId(STEP_1);
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1)); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance1));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance1)); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance1));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(anyString(), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(chunks.stream().filter(c -> c.getStatus() == WorkChunkStatusEnum.READY));
doAnswer(a -> {
Consumer<Integer> callback = a.getArgument(1);
callback.accept(1);
return null;
}).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any());
stubUpdateInstanceCallback(instance1); stubUpdateInstanceCallback(instance1);
// Execute // Execute
@ -279,7 +268,7 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
// Verify // Verify
verify(myWorkChannelProducer, times(2)).send(myMessageCaptor.capture()); verify(myWorkChannelProducer, times(2)).send(myMessageCaptor.capture());
verify(myJobPersistence, times(2)).updateInstance(eq(INSTANCE_ID), any()); verify(myJobPersistence, times(1)).updateInstance(eq(INSTANCE_ID), any());
verifyNoMoreInteractions(myJobPersistence); verifyNoMoreInteractions(myJobPersistence);
JobWorkNotification payload0 = myMessageCaptor.getAllValues().get(0).getPayload(); JobWorkNotification payload0 = myMessageCaptor.getAllValues().get(0).getPayload();
assertEquals(STEP_2, payload0.getTargetStepId()); assertEquals(STEP_2, payload0.getTargetStepId());
@ -297,6 +286,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
instance.setEndTime(parseTime("2001-01-01T12:12:12Z")); instance.setEndTime(parseTime("2001-01-01T12:12:12Z"));
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance)); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
mySvc.runMaintenancePass(); mySvc.runMaintenancePass();
@ -320,6 +311,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator()); when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())).thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance)); when(myJobPersistence.fetchInstance(INSTANCE_ID)).thenReturn(Optional.of(instance));
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(INSTANCE_ID), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
stubUpdateInstanceCallback(instance); stubUpdateInstanceCallback(instance);
// Execute // Execute
@ -362,6 +355,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())) when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t->chunks.iterator()); .thenAnswer(t->chunks.iterator());
when(myJobPersistence.fetchAllWorkChunksForJobInStates(eq(instance.getInstanceId()), eq(Set.of(WorkChunkStatusEnum.READY))))
.thenReturn(Stream.empty());
stubUpdateInstanceCallback(instance); stubUpdateInstanceCallback(instance);
mySvc.runMaintenancePass(); mySvc.runMaintenancePass();
@ -383,10 +378,12 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
private void runEnqueueReadyChunksTest(List<WorkChunk> theChunks, JobDefinition<TestJobParameters> theJobDefinition) { private void runEnqueueReadyChunksTest(List<WorkChunk> theChunks, JobDefinition<TestJobParameters> theJobDefinition) {
myJobDefinitionRegistry.addJobDefinition(theJobDefinition); myJobDefinitionRegistry.addJobDefinition(theJobDefinition);
JobInstance instance = createInstance(); JobInstance instance = createInstance();
// we'll set the instance to the first step id
theChunks.stream().findFirst().ifPresent(c -> {
instance.setCurrentGatedStepId(c.getTargetStepId());
});
instance.setJobDefinitionId(theJobDefinition.getJobDefinitionId()); instance.setJobDefinitionId(theJobDefinition.getJobDefinitionId());
Session sessionContract = mock(Session.class);
// mocks // mocks
when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance)); when(myJobPersistence.fetchInstances(anyInt(), eq(0))).thenReturn(Lists.newArrayList(instance));
when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance)); when(myJobPersistence.fetchInstance(eq(INSTANCE_ID))).thenReturn(Optional.of(instance));
@ -394,20 +391,13 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
.thenAnswer(t -> theChunks.stream()); .thenAnswer(t -> theChunks.stream());
when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean())) when(myJobPersistence.fetchAllWorkChunksIterator(eq(INSTANCE_ID), anyBoolean()))
.thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.QUEUED)).toList().iterator()); .thenAnswer(t -> theChunks.stream().map(c -> c.setStatus(WorkChunkStatusEnum.QUEUED)).toList().iterator());
// we just need it to fire, so we'll fire it manually
when(((TestHapiTransactionservice)myTransactionService).overrideExecute(any()))
.thenAnswer(args -> {
TransactionCallback<?> callback = args.getArgument(0);
callback.doInTransaction(null);
return null;
});
// test // test
mySvc.runMaintenancePass(); mySvc.runMaintenancePass();
} }
@Test @Test
public void testMaintenancePass_withREADYworkChunksForReductionSteps_movedToQueueButNotPublished() { public void testMaintenancePass_withREADYworkChunksForReductionSteps_notQueuedButProcessed() {
// setup // setup
List<WorkChunk> chunks = List.of( List<WorkChunk> chunks = List.of(
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY), createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY),
@ -415,19 +405,17 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
); );
// when // when
doAnswer(args -> { when(myJobPersistence.getDistinctWorkChunkStatesForJobAndStep(anyString(), anyString()))
Consumer<Integer> consumer = args.getArgument(1); .thenReturn(chunks.stream().map(WorkChunk::getStatus).collect(Collectors.toSet()));
consumer.accept(1);
return 1;
}).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any());
// test // test
runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction()); runEnqueueReadyChunksTest(chunks, createJobDefinitionWithReduction());
// verify // verify never updated (should remain in ready state)
// saved, but not sent to the queue verify(myJobPersistence, never()).enqueueWorkChunkForProcessing(anyString(), any());
verify(myJobPersistence, times(2)).enqueueWorkChunkForProcessing(anyString(), any());
verify(myWorkChannelProducer, never()).send(any()); verify(myWorkChannelProducer, never()).send(any());
verify(myReductionStepExecutorService)
.triggerReductionStep(anyString(), any());
} }
@Test @Test
@ -445,7 +433,6 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
return 1; return 1;
}).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any()); }).when(myJobPersistence).enqueueWorkChunkForProcessing(anyString(), any());
// test // test
runEnqueueReadyChunksTest(chunks, createJobDefinition()); runEnqueueReadyChunksTest(chunks, createJobDefinition());
@ -464,8 +451,8 @@ public class JobMaintenanceServiceImplTest extends BaseBatch2Test {
public void testMaintenancePass_whenUpdateFails_skipsWorkChunkAndLogs() { public void testMaintenancePass_whenUpdateFails_skipsWorkChunkAndLogs() {
// setup // setup
List<WorkChunk> chunks = List.of( List<WorkChunk> chunks = List.of(
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY), createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY),
createWorkChunkStep3().setStatus(WorkChunkStatusEnum.READY) createWorkChunkStep2().setStatus(WorkChunkStatusEnum.READY)
); );
myLogCapture.setUp(Level.ERROR); myLogCapture.setUp(Level.ERROR);