diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4526-fix-reduction-step.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4526-fix-reduction-step.yaml new file mode 100644 index 00000000000..81ed230ca2c --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4526-fix-reduction-step.yaml @@ -0,0 +1,7 @@ +--- +type: fix +issue: 4526 +title: "Fixing an issue where a long running reduction step causes + the message not to be processed fast enough, thereby allowing + multiple reduction step jobs to start. + " diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java index a8fa8a00d25..1920ef0a372 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/coordinator/WorkChannelMessageHandler.java @@ -36,6 +36,7 @@ import ca.uhn.fhir.batch2.util.Batch2Constants; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.util.Logs; import org.apache.commons.lang3.Validate; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; @@ -43,6 +44,9 @@ import org.springframework.messaging.MessagingException; import javax.annotation.Nonnull; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; /** * This handler receives batch work request messages and performs the batch work requested by the message @@ -112,7 +116,27 @@ class WorkChannelMessageHandler implements MessageHandler { } JobStepExecutor stepExecutor = myJobStepExecutorFactory.newJobStepExecutor(instance, workChunk, cursor); - stepExecutor.executeStep(); + // TODO - ls + /* + * We should change this to actually have + * the reduction step take in smaller sets of + * lists of chunks from the previous steps (one + * at a time still) and compose the + * report gradually and in an idempotent way + */ + if (isReductionWorkNotification) { + // do async due to long running process + // we'll fire off a separate thread and let the job continue + ScheduledExecutorService exService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(@NotNull Runnable r) { + return new Thread(r, "Reduction-step-thread"); + } + }); + exService.execute(stepExecutor::executeStep); + } else { + stepExecutor.executeStep(); + } } private void markInProgressIfQueued(JobInstance theInstance) {