From d5ebd1f667baf87750c39d3559d574f4b07046cb Mon Sep 17 00:00:00 2001 From: michaelabuckley Date: Sat, 4 Mar 2023 12:12:04 -0500 Subject: [PATCH] Avoid fetching work-chunk data (#4622) * Replace work-chunk maintenance query with projection to avoid fetching clob data. * Add processing logging --- .../changelog/6_4_0/4622-batch2-chunk-io.yaml | 4 ++ .../jpa/batch2/JpaJobPersistenceImpl.java | 25 +++++-- .../dao/data/IBatch2WorkChunkRepository.java | 11 +++ .../jpa/entity/Batch2WorkChunkEntity.java | 31 ++++++++ .../jpa/batch2/JpaJobPersistenceImplTest.java | 69 ------------------ .../jpa/batch2/JpaJobPersistenceImplTest.java | 71 +++++++++++++++++++ .../maintenance/JobInstanceProcessor.java | 6 ++ .../JobInstanceProgressCalculator.java | 6 ++ 8 files changed, 148 insertions(+), 75 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4622-batch2-chunk-io.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4622-batch2-chunk-io.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4622-batch2-chunk-io.yaml new file mode 100644 index 00000000000..7676575795b --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_4_0/4622-batch2-chunk-io.yaml @@ -0,0 +1,4 @@ +--- +type: perf +issue: 4622 +title: "The batch system now reads less data during the maintenance pass. This avoids slowdowns on large systems." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java index c5f04060c68..b2994b4026b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImpl.java @@ -322,12 +322,24 @@ public class JpaJobPersistenceImpl implements IJobPersistence { } private void fetchChunks(String theInstanceId, boolean theIncludeData, int thePageSize, int thePageIndex, Consumer theConsumer) { - myTxTemplate.executeWithoutResult(tx -> { - List chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId); - for (Batch2WorkChunkEntity chunk : chunks) { - theConsumer.accept(toChunk(chunk, theIncludeData)); - } - }); + if (theIncludeData) { + // I think this is dead: MB + myTxTemplate.executeWithoutResult(tx -> { + List chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId); + for (Batch2WorkChunkEntity chunk : chunks) { + theConsumer.accept(toChunk(chunk, theIncludeData)); + } + }); + } else { + // wipmb mb here + // a minimally-different path for a prod-fix. + myTxTemplate.executeWithoutResult(tx -> { + List chunks = myWorkChunkRepository.fetchChunksNoData(PageRequest.of(thePageIndex, thePageSize), theInstanceId); + for (Batch2WorkChunkEntity chunk : chunks) { + theConsumer.accept(toChunk(chunk, theIncludeData)); + } + }); + } } @Override @@ -355,6 +367,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence { */ @Override public Iterator fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) { + // wipmb mb here return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer)); } diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java index 16b68d6090a..d703da4461e 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/data/IBatch2WorkChunkRepository.java @@ -37,6 +37,17 @@ public interface IBatch2WorkChunkRepository extends JpaRepository fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId); + /** + * A projection query to avoid fetching the CLOB over the wire. + * Otherwise, the same as fetchChunks. + */ + @Query("SELECT new Batch2WorkChunkEntity(" + + "e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus," + + "e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime," + + "e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed" + + ") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC") + List fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId); + @Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId") List getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId); diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkEntity.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkEntity.java index a0598b87893..73ddf421663 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkEntity.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/entity/Batch2WorkChunkEntity.java @@ -98,6 +98,36 @@ public class Batch2WorkChunkEntity implements Serializable { @Column(name = "ERROR_COUNT", nullable = false) private int myErrorCount; + + /** + * Default constructor for Hibernate. + */ + public Batch2WorkChunkEntity() { + } + + /** + * Projection constructor for no-date path. + */ + public Batch2WorkChunkEntity(String theId, int theSequence, String theJobDefinitionId, int theJobDefinitionVersion, + String theInstanceId, String theTargetStepId, StatusEnum theStatus, + Date theCreateTime, Date theStartTime, Date theUpdateTime, Date theEndTime, + String theErrorMessage, int theErrorCount, Integer theRecordsProcessed) { + myId = theId; + mySequence = theSequence; + myJobDefinitionId = theJobDefinitionId; + myJobDefinitionVersion = theJobDefinitionVersion; + myInstanceId = theInstanceId; + myTargetStepId = theTargetStepId; + myStatus = theStatus; + myCreateTime = theCreateTime; + myStartTime = theStartTime; + myUpdateTime = theUpdateTime; + myEndTime = theEndTime; + myErrorMessage = theErrorMessage; + myErrorCount = theErrorCount; + myRecordsProcessed = theRecordsProcessed; + } + public int getErrorCount() { return myErrorCount; } @@ -242,4 +272,5 @@ public class Batch2WorkChunkEntity implements Serializable { .append("errorMessage", myErrorMessage) .toString(); } + } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index e82bc8b3f80..b632e86b41a 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -4,40 +4,31 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson; import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest; import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; -import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository; import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository; import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; -import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; import ca.uhn.fhir.jpa.util.JobInstanceUtil; -import ca.uhn.fhir.model.api.PagingIterator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.transaction.PlatformTransactionManager; -import java.util.ArrayList; import java.util.Arrays; import java.util.Date; -import java.util.Iterator; import java.util.List; import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -192,66 +183,6 @@ class JpaJobPersistenceImplTest { } } - @Test - public void fetchAllWorkChunksIterator_withValidIdAndBoolToSayToIncludeData_returnsPagingIterator() { - // setup - String instanceId = "instanceId"; - String jobDefinition = "definitionId"; - int version = 1; - String targetStep = "step"; - - List workChunkEntityList = new ArrayList<>(); - Batch2WorkChunkEntity chunk1 = new Batch2WorkChunkEntity(); - chunk1.setId("id1"); - chunk1.setJobDefinitionVersion(version); - chunk1.setJobDefinitionId(jobDefinition); - chunk1.setSerializedData("serialized data 1"); - chunk1.setTargetStepId(targetStep); - workChunkEntityList.add(chunk1); - Batch2WorkChunkEntity chunk2 = new Batch2WorkChunkEntity(); - chunk2.setId("id2"); - chunk2.setSerializedData("serialized data 2"); - chunk2.setJobDefinitionId(jobDefinition); - chunk2.setJobDefinitionVersion(version); - chunk2.setTargetStepId(targetStep); - workChunkEntityList.add(chunk2); - - for (boolean includeData : new boolean[] { true , false }) { - // when - when(myWorkChunkRepository.fetchChunks(any(PageRequest.class), eq(instanceId))) - .thenReturn(workChunkEntityList); - - // test - Iterator chunkIterator = mySvc.fetchAllWorkChunksIterator(instanceId, includeData); - - // verify - assertTrue(chunkIterator instanceof PagingIterator); - verify(myWorkChunkRepository, never()) - .fetchChunks(any(PageRequest.class), anyString()); - - // now try the iterator out... - WorkChunk chunk = chunkIterator.next(); - assertEquals(chunk1.getId(), chunk.getId()); - if (includeData) { - assertEquals(chunk1.getSerializedData(), chunk.getData()); - } else { - assertNull(chunk.getData()); - } - chunk = chunkIterator.next(); - assertEquals(chunk2.getId(), chunk.getId()); - if (includeData) { - assertEquals(chunk2.getSerializedData(), chunk.getData()); - } else { - assertNull(chunk.getData()); - } - - verify(myWorkChunkRepository) - .fetchChunks(any(PageRequest.class), eq(instanceId)); - - reset(myWorkChunkRepository); - } - } - @Test public void fetchInstances_validRequest_returnsFoundInstances() { // setup diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java index 67aadbf9018..9302986767a 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/batch2/JpaJobPersistenceImplTest.java @@ -14,6 +14,7 @@ import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity; import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity; import ca.uhn.fhir.jpa.test.BaseJpaR4Test; import ca.uhn.fhir.util.JsonUtil; +import com.google.common.collect.Iterators; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; @@ -358,6 +359,76 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test { assertNull(chunk.getData()); } + @Test + void testStoreAndFetchChunksForInstance_NoData() { + //wipmb here + // given + JobInstance instance = createInstance(); + String instanceId = mySvc.storeNewInstance(instance); + + String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data"); + String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data"); + String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data"); + + mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(erroredId); + MarkWorkChunkAsErrorRequest parameters = new MarkWorkChunkAsErrorRequest(); + parameters.setChunkId(erroredId); + parameters.setErrorMsg("Our error message"); + mySvc.markWorkChunkAsErroredAndIncrementErrorCount(parameters); + + mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(completedId); + mySvc.markWorkChunkAsCompletedAndClearData(instanceId, completedId, 11); + + // when + Iterator workChunks = mySvc.fetchAllWorkChunksIterator(instanceId, false); + + // then + ArrayList chunks = new ArrayList<>(); + Iterators.addAll(chunks, workChunks); + assertEquals(3, chunks.size()); + + { + WorkChunk workChunk = chunks.get(0); + assertNull(workChunk.getData(), "we skip the data"); + assertEquals(queuedId, workChunk.getId()); + assertEquals(JOB_DEFINITION_ID, workChunk.getJobDefinitionId()); + assertEquals(JOB_DEF_VER, workChunk.getJobDefinitionVersion()); + assertEquals(instanceId, workChunk.getInstanceId()); + assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId()); + assertEquals(0, workChunk.getSequence()); + assertEquals(StatusEnum.QUEUED, workChunk.getStatus()); + + + assertNotNull(workChunk.getCreateTime()); + assertNotNull(workChunk.getStartTime()); + assertNotNull(workChunk.getUpdateTime()); + assertNull(workChunk.getEndTime()); + assertNull(workChunk.getErrorMessage()); + assertEquals(0, workChunk.getErrorCount()); + assertEquals(null, workChunk.getRecordsProcessed()); + } + + { + WorkChunk workChunk1 = chunks.get(1); + assertEquals(StatusEnum.ERRORED, workChunk1.getStatus()); + assertEquals("Our error message", workChunk1.getErrorMessage()); + assertEquals(1, workChunk1.getErrorCount()); + assertEquals(null, workChunk1.getRecordsProcessed()); + assertNotNull(workChunk1.getEndTime()); + } + + { + WorkChunk workChunk2 = chunks.get(2); + assertEquals(StatusEnum.COMPLETED, workChunk2.getStatus()); + assertNotNull(workChunk2.getEndTime()); + assertEquals(11, workChunk2.getRecordsProcessed()); + assertNull(workChunk2.getErrorMessage()); + assertEquals(0, workChunk2.getErrorCount()); + } + + } + + @Test public void testStoreAndFetchWorkChunk_WithData() { JobInstance instance = createInstance(); diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 9e26284ad42..0849bfe85ab 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -33,6 +33,7 @@ import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; import ca.uhn.fhir.model.api.IModelJson; import ca.uhn.fhir.util.Logs; +import ca.uhn.fhir.util.StopWatch; import org.apache.commons.lang3.time.DateUtils; import org.slf4j.Logger; @@ -68,6 +69,9 @@ public class JobInstanceProcessor { } public void process() { + ourLog.debug("Starting job processing: {}", myInstanceId); + StopWatch stopWatch = new StopWatch(); + JobInstance theInstance = myJobPersistence.fetchInstance(myInstanceId).orElse(null); if (theInstance == null) { return; @@ -76,6 +80,8 @@ public class JobInstanceProcessor { handleCancellation(theInstance); cleanupInstance(theInstance); triggerGatedExecutions(theInstance); + + ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch); } private void handleCancellation(JobInstance theInstance) { diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java index 927dc587526..7bd2162e821 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/progress/JobInstanceProgressCalculator.java @@ -27,6 +27,7 @@ import ca.uhn.fhir.batch2.model.JobInstance; import ca.uhn.fhir.batch2.model.StatusEnum; import ca.uhn.fhir.batch2.model.WorkChunk; import ca.uhn.fhir.util.Logs; +import ca.uhn.fhir.util.StopWatch; import org.slf4j.Logger; import javax.annotation.Nonnull; @@ -46,6 +47,9 @@ public class JobInstanceProgressCalculator { public void calculateAndStoreInstanceProgress(JobInstance theInstance) { String instanceId = theInstance.getInstanceId(); + StopWatch stopWatch = new StopWatch(); + ourLog.trace("calculating progress: {}", instanceId); + InstanceProgress instanceProgress = calculateInstanceProgress(instanceId); @@ -76,11 +80,13 @@ public class JobInstanceProgressCalculator { } } + ourLog.trace("calculating progress: {} - complete in {}", instanceId, stopWatch); } @Nonnull private InstanceProgress calculateInstanceProgress(String instanceId) { InstanceProgress instanceProgress = new InstanceProgress(); + // wipmb mb here Iterator workChunkIterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, false); while (workChunkIterator.hasNext()) {