Avoid fetching work-chunk data (#4622)

* Replace work-chunk maintenance query with projection to avoid fetching clob data.
* Add processing logging
This commit is contained in:
michaelabuckley 2023-03-04 12:12:04 -05:00 committed by GitHub
parent 98b0dc308f
commit d5ebd1f667
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 148 additions and 75 deletions

View File

@ -0,0 +1,4 @@
---
type: perf
issue: 4622
title: "The batch system now reads less data during the maintenance pass. This avoids slowdowns on large systems."

View File

@ -322,12 +322,24 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
}
private void fetchChunks(String theInstanceId, boolean theIncludeData, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, theIncludeData));
}
});
if (theIncludeData) {
// I think this is dead: MB
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, theIncludeData));
}
});
} else {
// wipmb mb here
// a minimally-different path for a prod-fix.
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunksNoData(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, theIncludeData));
}
});
}
}
@Override
@ -355,6 +367,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
*/
@Override
public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
// wipmb mb here
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
}

View File

@ -37,6 +37,17 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
/**
* A projection query to avoid fetching the CLOB over the wire.
* Otherwise, the same as fetchChunks.
*/
@Query("SELECT new Batch2WorkChunkEntity(" +
"e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus," +
"e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime," +
"e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed" +
") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);

View File

@ -98,6 +98,36 @@ public class Batch2WorkChunkEntity implements Serializable {
@Column(name = "ERROR_COUNT", nullable = false)
private int myErrorCount;
/**
* Default constructor for Hibernate.
*/
public Batch2WorkChunkEntity() {
}
/**
* Projection constructor for no-date path.
*/
public Batch2WorkChunkEntity(String theId, int theSequence, String theJobDefinitionId, int theJobDefinitionVersion,
String theInstanceId, String theTargetStepId, StatusEnum theStatus,
Date theCreateTime, Date theStartTime, Date theUpdateTime, Date theEndTime,
String theErrorMessage, int theErrorCount, Integer theRecordsProcessed) {
myId = theId;
mySequence = theSequence;
myJobDefinitionId = theJobDefinitionId;
myJobDefinitionVersion = theJobDefinitionVersion;
myInstanceId = theInstanceId;
myTargetStepId = theTargetStepId;
myStatus = theStatus;
myCreateTime = theCreateTime;
myStartTime = theStartTime;
myUpdateTime = theUpdateTime;
myEndTime = theEndTime;
myErrorMessage = theErrorMessage;
myErrorCount = theErrorCount;
myRecordsProcessed = theRecordsProcessed;
}
public int getErrorCount() {
return myErrorCount;
}
@ -242,4 +272,5 @@ public class Batch2WorkChunkEntity implements Serializable {
.append("errorMessage", myErrorMessage)
.toString();
}
}

View File

@ -4,40 +4,31 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson;
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import ca.uhn.fhir.model.api.PagingIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -192,66 +183,6 @@ class JpaJobPersistenceImplTest {
}
}
@Test
public void fetchAllWorkChunksIterator_withValidIdAndBoolToSayToIncludeData_returnsPagingIterator() {
// setup
String instanceId = "instanceId";
String jobDefinition = "definitionId";
int version = 1;
String targetStep = "step";
List<Batch2WorkChunkEntity> workChunkEntityList = new ArrayList<>();
Batch2WorkChunkEntity chunk1 = new Batch2WorkChunkEntity();
chunk1.setId("id1");
chunk1.setJobDefinitionVersion(version);
chunk1.setJobDefinitionId(jobDefinition);
chunk1.setSerializedData("serialized data 1");
chunk1.setTargetStepId(targetStep);
workChunkEntityList.add(chunk1);
Batch2WorkChunkEntity chunk2 = new Batch2WorkChunkEntity();
chunk2.setId("id2");
chunk2.setSerializedData("serialized data 2");
chunk2.setJobDefinitionId(jobDefinition);
chunk2.setJobDefinitionVersion(version);
chunk2.setTargetStepId(targetStep);
workChunkEntityList.add(chunk2);
for (boolean includeData : new boolean[] { true , false }) {
// when
when(myWorkChunkRepository.fetchChunks(any(PageRequest.class), eq(instanceId)))
.thenReturn(workChunkEntityList);
// test
Iterator<WorkChunk> chunkIterator = mySvc.fetchAllWorkChunksIterator(instanceId, includeData);
// verify
assertTrue(chunkIterator instanceof PagingIterator);
verify(myWorkChunkRepository, never())
.fetchChunks(any(PageRequest.class), anyString());
// now try the iterator out...
WorkChunk chunk = chunkIterator.next();
assertEquals(chunk1.getId(), chunk.getId());
if (includeData) {
assertEquals(chunk1.getSerializedData(), chunk.getData());
} else {
assertNull(chunk.getData());
}
chunk = chunkIterator.next();
assertEquals(chunk2.getId(), chunk.getId());
if (includeData) {
assertEquals(chunk2.getSerializedData(), chunk.getData());
} else {
assertNull(chunk.getData());
}
verify(myWorkChunkRepository)
.fetchChunks(any(PageRequest.class), eq(instanceId));
reset(myWorkChunkRepository);
}
}
@Test
public void fetchInstances_validRequest_returnsFoundInstances() {
// setup

View File

@ -14,6 +14,7 @@ import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.util.JsonUtil;
import com.google.common.collect.Iterators;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
@ -358,6 +359,76 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
assertNull(chunk.getData());
}
@Test
void testStoreAndFetchChunksForInstance_NoData() {
//wipmb here
// given
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data");
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data");
String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data");
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(erroredId);
MarkWorkChunkAsErrorRequest parameters = new MarkWorkChunkAsErrorRequest();
parameters.setChunkId(erroredId);
parameters.setErrorMsg("Our error message");
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(parameters);
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(completedId);
mySvc.markWorkChunkAsCompletedAndClearData(instanceId, completedId, 11);
// when
Iterator<WorkChunk> workChunks = mySvc.fetchAllWorkChunksIterator(instanceId, false);
// then
ArrayList<WorkChunk> chunks = new ArrayList<>();
Iterators.addAll(chunks, workChunks);
assertEquals(3, chunks.size());
{
WorkChunk workChunk = chunks.get(0);
assertNull(workChunk.getData(), "we skip the data");
assertEquals(queuedId, workChunk.getId());
assertEquals(JOB_DEFINITION_ID, workChunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, workChunk.getJobDefinitionVersion());
assertEquals(instanceId, workChunk.getInstanceId());
assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId());
assertEquals(0, workChunk.getSequence());
assertEquals(StatusEnum.QUEUED, workChunk.getStatus());
assertNotNull(workChunk.getCreateTime());
assertNotNull(workChunk.getStartTime());
assertNotNull(workChunk.getUpdateTime());
assertNull(workChunk.getEndTime());
assertNull(workChunk.getErrorMessage());
assertEquals(0, workChunk.getErrorCount());
assertEquals(null, workChunk.getRecordsProcessed());
}
{
WorkChunk workChunk1 = chunks.get(1);
assertEquals(StatusEnum.ERRORED, workChunk1.getStatus());
assertEquals("Our error message", workChunk1.getErrorMessage());
assertEquals(1, workChunk1.getErrorCount());
assertEquals(null, workChunk1.getRecordsProcessed());
assertNotNull(workChunk1.getEndTime());
}
{
WorkChunk workChunk2 = chunks.get(2);
assertEquals(StatusEnum.COMPLETED, workChunk2.getStatus());
assertNotNull(workChunk2.getEndTime());
assertEquals(11, workChunk2.getRecordsProcessed());
assertNull(workChunk2.getErrorMessage());
assertEquals(0, workChunk2.getErrorCount());
}
}
@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance();

View File

@ -33,6 +33,7 @@ import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
@ -68,6 +69,9 @@ public class JobInstanceProcessor {
}
public void process() {
ourLog.debug("Starting job processing: {}", myInstanceId);
StopWatch stopWatch = new StopWatch();
JobInstance theInstance = myJobPersistence.fetchInstance(myInstanceId).orElse(null);
if (theInstance == null) {
return;
@ -76,6 +80,8 @@ public class JobInstanceProcessor {
handleCancellation(theInstance);
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance);
ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
}
private void handleCancellation(JobInstance theInstance) {

View File

@ -27,6 +27,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.slf4j.Logger;
import javax.annotation.Nonnull;
@ -46,6 +47,9 @@ public class JobInstanceProgressCalculator {
public void calculateAndStoreInstanceProgress(JobInstance theInstance) {
String instanceId = theInstance.getInstanceId();
StopWatch stopWatch = new StopWatch();
ourLog.trace("calculating progress: {}", instanceId);
InstanceProgress instanceProgress = calculateInstanceProgress(instanceId);
@ -76,11 +80,13 @@ public class JobInstanceProgressCalculator {
}
}
ourLog.trace("calculating progress: {} - complete in {}", instanceId, stopWatch);
}
@Nonnull
private InstanceProgress calculateInstanceProgress(String instanceId) {
InstanceProgress instanceProgress = new InstanceProgress();
// wipmb mb here
Iterator<WorkChunk> workChunkIterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, false);
while (workChunkIterator.hasNext()) {