Avoid fetching work-chunk data (#4622)
* Replace work-chunk maintenance query with projection to avoid fetching clob data. * Add processing logging
This commit is contained in:
parent
98b0dc308f
commit
d5ebd1f667
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
type: perf
|
||||
issue: 4622
|
||||
title: "The batch system now reads less data during the maintenance pass. This avoids slowdowns on large systems."
|
|
@ -322,12 +322,24 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
|
|||
}
|
||||
|
||||
private void fetchChunks(String theInstanceId, boolean theIncludeData, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
|
||||
myTxTemplate.executeWithoutResult(tx -> {
|
||||
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
|
||||
for (Batch2WorkChunkEntity chunk : chunks) {
|
||||
theConsumer.accept(toChunk(chunk, theIncludeData));
|
||||
}
|
||||
});
|
||||
if (theIncludeData) {
|
||||
// I think this is dead: MB
|
||||
myTxTemplate.executeWithoutResult(tx -> {
|
||||
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
|
||||
for (Batch2WorkChunkEntity chunk : chunks) {
|
||||
theConsumer.accept(toChunk(chunk, theIncludeData));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// wipmb mb here
|
||||
// a minimally-different path for a prod-fix.
|
||||
myTxTemplate.executeWithoutResult(tx -> {
|
||||
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunksNoData(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
|
||||
for (Batch2WorkChunkEntity chunk : chunks) {
|
||||
theConsumer.accept(toChunk(chunk, theIncludeData));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -355,6 +367,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
|
|||
*/
|
||||
@Override
|
||||
public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
|
||||
// wipmb mb here
|
||||
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,17 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
|
|||
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
|
||||
List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
|
||||
|
||||
/**
|
||||
* A projection query to avoid fetching the CLOB over the wire.
|
||||
* Otherwise, the same as fetchChunks.
|
||||
*/
|
||||
@Query("SELECT new Batch2WorkChunkEntity(" +
|
||||
"e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus," +
|
||||
"e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime," +
|
||||
"e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed" +
|
||||
") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
|
||||
List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
|
||||
|
||||
@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
|
||||
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
|
||||
|
||||
|
|
|
@ -98,6 +98,36 @@ public class Batch2WorkChunkEntity implements Serializable {
|
|||
@Column(name = "ERROR_COUNT", nullable = false)
|
||||
private int myErrorCount;
|
||||
|
||||
|
||||
/**
|
||||
* Default constructor for Hibernate.
|
||||
*/
|
||||
public Batch2WorkChunkEntity() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Projection constructor for no-date path.
|
||||
*/
|
||||
public Batch2WorkChunkEntity(String theId, int theSequence, String theJobDefinitionId, int theJobDefinitionVersion,
|
||||
String theInstanceId, String theTargetStepId, StatusEnum theStatus,
|
||||
Date theCreateTime, Date theStartTime, Date theUpdateTime, Date theEndTime,
|
||||
String theErrorMessage, int theErrorCount, Integer theRecordsProcessed) {
|
||||
myId = theId;
|
||||
mySequence = theSequence;
|
||||
myJobDefinitionId = theJobDefinitionId;
|
||||
myJobDefinitionVersion = theJobDefinitionVersion;
|
||||
myInstanceId = theInstanceId;
|
||||
myTargetStepId = theTargetStepId;
|
||||
myStatus = theStatus;
|
||||
myCreateTime = theCreateTime;
|
||||
myStartTime = theStartTime;
|
||||
myUpdateTime = theUpdateTime;
|
||||
myEndTime = theEndTime;
|
||||
myErrorMessage = theErrorMessage;
|
||||
myErrorCount = theErrorCount;
|
||||
myRecordsProcessed = theRecordsProcessed;
|
||||
}
|
||||
|
||||
public int getErrorCount() {
|
||||
return myErrorCount;
|
||||
}
|
||||
|
@ -242,4 +272,5 @@ public class Batch2WorkChunkEntity implements Serializable {
|
|||
.append("errorMessage", myErrorMessage)
|
||||
.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -4,40 +4,31 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson;
|
|||
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
|
||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||
import ca.uhn.fhir.batch2.model.StatusEnum;
|
||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
|
||||
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
|
||||
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
|
||||
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
|
||||
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
|
||||
import ca.uhn.fhir.model.api.PagingIterator;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -192,66 +183,6 @@ class JpaJobPersistenceImplTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchAllWorkChunksIterator_withValidIdAndBoolToSayToIncludeData_returnsPagingIterator() {
|
||||
// setup
|
||||
String instanceId = "instanceId";
|
||||
String jobDefinition = "definitionId";
|
||||
int version = 1;
|
||||
String targetStep = "step";
|
||||
|
||||
List<Batch2WorkChunkEntity> workChunkEntityList = new ArrayList<>();
|
||||
Batch2WorkChunkEntity chunk1 = new Batch2WorkChunkEntity();
|
||||
chunk1.setId("id1");
|
||||
chunk1.setJobDefinitionVersion(version);
|
||||
chunk1.setJobDefinitionId(jobDefinition);
|
||||
chunk1.setSerializedData("serialized data 1");
|
||||
chunk1.setTargetStepId(targetStep);
|
||||
workChunkEntityList.add(chunk1);
|
||||
Batch2WorkChunkEntity chunk2 = new Batch2WorkChunkEntity();
|
||||
chunk2.setId("id2");
|
||||
chunk2.setSerializedData("serialized data 2");
|
||||
chunk2.setJobDefinitionId(jobDefinition);
|
||||
chunk2.setJobDefinitionVersion(version);
|
||||
chunk2.setTargetStepId(targetStep);
|
||||
workChunkEntityList.add(chunk2);
|
||||
|
||||
for (boolean includeData : new boolean[] { true , false }) {
|
||||
// when
|
||||
when(myWorkChunkRepository.fetchChunks(any(PageRequest.class), eq(instanceId)))
|
||||
.thenReturn(workChunkEntityList);
|
||||
|
||||
// test
|
||||
Iterator<WorkChunk> chunkIterator = mySvc.fetchAllWorkChunksIterator(instanceId, includeData);
|
||||
|
||||
// verify
|
||||
assertTrue(chunkIterator instanceof PagingIterator);
|
||||
verify(myWorkChunkRepository, never())
|
||||
.fetchChunks(any(PageRequest.class), anyString());
|
||||
|
||||
// now try the iterator out...
|
||||
WorkChunk chunk = chunkIterator.next();
|
||||
assertEquals(chunk1.getId(), chunk.getId());
|
||||
if (includeData) {
|
||||
assertEquals(chunk1.getSerializedData(), chunk.getData());
|
||||
} else {
|
||||
assertNull(chunk.getData());
|
||||
}
|
||||
chunk = chunkIterator.next();
|
||||
assertEquals(chunk2.getId(), chunk.getId());
|
||||
if (includeData) {
|
||||
assertEquals(chunk2.getSerializedData(), chunk.getData());
|
||||
} else {
|
||||
assertNull(chunk.getData());
|
||||
}
|
||||
|
||||
verify(myWorkChunkRepository)
|
||||
.fetchChunks(any(PageRequest.class), eq(instanceId));
|
||||
|
||||
reset(myWorkChunkRepository);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchInstances_validRequest_returnsFoundInstances() {
|
||||
// setup
|
||||
|
|
|
@ -14,6 +14,7 @@ import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
|
|||
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
|
||||
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
||||
import ca.uhn.fhir.util.JsonUtil;
|
||||
import com.google.common.collect.Iterators;
|
||||
import org.junit.jupiter.api.MethodOrderer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
|
@ -358,6 +359,76 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
|
|||
assertNull(chunk.getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStoreAndFetchChunksForInstance_NoData() {
|
||||
//wipmb here
|
||||
// given
|
||||
JobInstance instance = createInstance();
|
||||
String instanceId = mySvc.storeNewInstance(instance);
|
||||
|
||||
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data");
|
||||
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data");
|
||||
String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data");
|
||||
|
||||
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(erroredId);
|
||||
MarkWorkChunkAsErrorRequest parameters = new MarkWorkChunkAsErrorRequest();
|
||||
parameters.setChunkId(erroredId);
|
||||
parameters.setErrorMsg("Our error message");
|
||||
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(parameters);
|
||||
|
||||
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(completedId);
|
||||
mySvc.markWorkChunkAsCompletedAndClearData(instanceId, completedId, 11);
|
||||
|
||||
// when
|
||||
Iterator<WorkChunk> workChunks = mySvc.fetchAllWorkChunksIterator(instanceId, false);
|
||||
|
||||
// then
|
||||
ArrayList<WorkChunk> chunks = new ArrayList<>();
|
||||
Iterators.addAll(chunks, workChunks);
|
||||
assertEquals(3, chunks.size());
|
||||
|
||||
{
|
||||
WorkChunk workChunk = chunks.get(0);
|
||||
assertNull(workChunk.getData(), "we skip the data");
|
||||
assertEquals(queuedId, workChunk.getId());
|
||||
assertEquals(JOB_DEFINITION_ID, workChunk.getJobDefinitionId());
|
||||
assertEquals(JOB_DEF_VER, workChunk.getJobDefinitionVersion());
|
||||
assertEquals(instanceId, workChunk.getInstanceId());
|
||||
assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId());
|
||||
assertEquals(0, workChunk.getSequence());
|
||||
assertEquals(StatusEnum.QUEUED, workChunk.getStatus());
|
||||
|
||||
|
||||
assertNotNull(workChunk.getCreateTime());
|
||||
assertNotNull(workChunk.getStartTime());
|
||||
assertNotNull(workChunk.getUpdateTime());
|
||||
assertNull(workChunk.getEndTime());
|
||||
assertNull(workChunk.getErrorMessage());
|
||||
assertEquals(0, workChunk.getErrorCount());
|
||||
assertEquals(null, workChunk.getRecordsProcessed());
|
||||
}
|
||||
|
||||
{
|
||||
WorkChunk workChunk1 = chunks.get(1);
|
||||
assertEquals(StatusEnum.ERRORED, workChunk1.getStatus());
|
||||
assertEquals("Our error message", workChunk1.getErrorMessage());
|
||||
assertEquals(1, workChunk1.getErrorCount());
|
||||
assertEquals(null, workChunk1.getRecordsProcessed());
|
||||
assertNotNull(workChunk1.getEndTime());
|
||||
}
|
||||
|
||||
{
|
||||
WorkChunk workChunk2 = chunks.get(2);
|
||||
assertEquals(StatusEnum.COMPLETED, workChunk2.getStatus());
|
||||
assertNotNull(workChunk2.getEndTime());
|
||||
assertEquals(11, workChunk2.getRecordsProcessed());
|
||||
assertNull(workChunk2.getErrorMessage());
|
||||
assertEquals(0, workChunk2.getErrorCount());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testStoreAndFetchWorkChunk_WithData() {
|
||||
JobInstance instance = createInstance();
|
||||
|
|
|
@ -33,6 +33,7 @@ import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
|
|||
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
|
||||
import ca.uhn.fhir.model.api.IModelJson;
|
||||
import ca.uhn.fhir.util.Logs;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
|
@ -68,6 +69,9 @@ public class JobInstanceProcessor {
|
|||
}
|
||||
|
||||
public void process() {
|
||||
ourLog.debug("Starting job processing: {}", myInstanceId);
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
|
||||
JobInstance theInstance = myJobPersistence.fetchInstance(myInstanceId).orElse(null);
|
||||
if (theInstance == null) {
|
||||
return;
|
||||
|
@ -76,6 +80,8 @@ public class JobInstanceProcessor {
|
|||
handleCancellation(theInstance);
|
||||
cleanupInstance(theInstance);
|
||||
triggerGatedExecutions(theInstance);
|
||||
|
||||
ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
|
||||
}
|
||||
|
||||
private void handleCancellation(JobInstance theInstance) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
|
|||
import ca.uhn.fhir.batch2.model.StatusEnum;
|
||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||
import ca.uhn.fhir.util.Logs;
|
||||
import ca.uhn.fhir.util.StopWatch;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
@ -46,6 +47,9 @@ public class JobInstanceProgressCalculator {
|
|||
|
||||
public void calculateAndStoreInstanceProgress(JobInstance theInstance) {
|
||||
String instanceId = theInstance.getInstanceId();
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
ourLog.trace("calculating progress: {}", instanceId);
|
||||
|
||||
|
||||
InstanceProgress instanceProgress = calculateInstanceProgress(instanceId);
|
||||
|
||||
|
@ -76,11 +80,13 @@ public class JobInstanceProgressCalculator {
|
|||
}
|
||||
|
||||
}
|
||||
ourLog.trace("calculating progress: {} - complete in {}", instanceId, stopWatch);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private InstanceProgress calculateInstanceProgress(String instanceId) {
|
||||
InstanceProgress instanceProgress = new InstanceProgress();
|
||||
// wipmb mb here
|
||||
Iterator<WorkChunk> workChunkIterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, false);
|
||||
|
||||
while (workChunkIterator.hasNext()) {
|
||||
|
|
Loading…
Reference in New Issue