Avoid fetching work-chunk data (#4622)
* Replace work-chunk maintenance query with projection to avoid fetching clob data. * Add processing logging
This commit is contained in:
parent
98b0dc308f
commit
d5ebd1f667
|
@ -0,0 +1,4 @@
|
||||||
|
---
|
||||||
|
type: perf
|
||||||
|
issue: 4622
|
||||||
|
title: "The batch system now reads less data during the maintenance pass. This avoids slowdowns on large systems."
|
|
@ -322,12 +322,24 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fetchChunks(String theInstanceId, boolean theIncludeData, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
|
private void fetchChunks(String theInstanceId, boolean theIncludeData, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
|
||||||
|
if (theIncludeData) {
|
||||||
|
// I think this is dead: MB
|
||||||
myTxTemplate.executeWithoutResult(tx -> {
|
myTxTemplate.executeWithoutResult(tx -> {
|
||||||
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
|
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
|
||||||
for (Batch2WorkChunkEntity chunk : chunks) {
|
for (Batch2WorkChunkEntity chunk : chunks) {
|
||||||
theConsumer.accept(toChunk(chunk, theIncludeData));
|
theConsumer.accept(toChunk(chunk, theIncludeData));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
|
// wipmb mb here
|
||||||
|
// a minimally-different path for a prod-fix.
|
||||||
|
myTxTemplate.executeWithoutResult(tx -> {
|
||||||
|
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunksNoData(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
|
||||||
|
for (Batch2WorkChunkEntity chunk : chunks) {
|
||||||
|
theConsumer.accept(toChunk(chunk, theIncludeData));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -355,6 +367,7 @@ public class JpaJobPersistenceImpl implements IJobPersistence {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
|
public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
|
||||||
|
// wipmb mb here
|
||||||
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
|
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,17 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
|
||||||
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
|
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
|
||||||
List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
|
List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A projection query to avoid fetching the CLOB over the wire.
|
||||||
|
* Otherwise, the same as fetchChunks.
|
||||||
|
*/
|
||||||
|
@Query("SELECT new Batch2WorkChunkEntity(" +
|
||||||
|
"e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus," +
|
||||||
|
"e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime," +
|
||||||
|
"e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed" +
|
||||||
|
") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
|
||||||
|
List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
|
||||||
|
|
||||||
@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
|
@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
|
||||||
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
|
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
|
||||||
|
|
||||||
|
|
|
@ -98,6 +98,36 @@ public class Batch2WorkChunkEntity implements Serializable {
|
||||||
@Column(name = "ERROR_COUNT", nullable = false)
|
@Column(name = "ERROR_COUNT", nullable = false)
|
||||||
private int myErrorCount;
|
private int myErrorCount;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default constructor for Hibernate.
|
||||||
|
*/
|
||||||
|
public Batch2WorkChunkEntity() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Projection constructor for no-date path.
|
||||||
|
*/
|
||||||
|
public Batch2WorkChunkEntity(String theId, int theSequence, String theJobDefinitionId, int theJobDefinitionVersion,
|
||||||
|
String theInstanceId, String theTargetStepId, StatusEnum theStatus,
|
||||||
|
Date theCreateTime, Date theStartTime, Date theUpdateTime, Date theEndTime,
|
||||||
|
String theErrorMessage, int theErrorCount, Integer theRecordsProcessed) {
|
||||||
|
myId = theId;
|
||||||
|
mySequence = theSequence;
|
||||||
|
myJobDefinitionId = theJobDefinitionId;
|
||||||
|
myJobDefinitionVersion = theJobDefinitionVersion;
|
||||||
|
myInstanceId = theInstanceId;
|
||||||
|
myTargetStepId = theTargetStepId;
|
||||||
|
myStatus = theStatus;
|
||||||
|
myCreateTime = theCreateTime;
|
||||||
|
myStartTime = theStartTime;
|
||||||
|
myUpdateTime = theUpdateTime;
|
||||||
|
myEndTime = theEndTime;
|
||||||
|
myErrorMessage = theErrorMessage;
|
||||||
|
myErrorCount = theErrorCount;
|
||||||
|
myRecordsProcessed = theRecordsProcessed;
|
||||||
|
}
|
||||||
|
|
||||||
public int getErrorCount() {
|
public int getErrorCount() {
|
||||||
return myErrorCount;
|
return myErrorCount;
|
||||||
}
|
}
|
||||||
|
@ -242,4 +272,5 @@ public class Batch2WorkChunkEntity implements Serializable {
|
||||||
.append("errorMessage", myErrorMessage)
|
.append("errorMessage", myErrorMessage)
|
||||||
.toString();
|
.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,40 +4,31 @@ import ca.uhn.fhir.batch2.api.JobOperationResultJson;
|
||||||
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
|
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
|
||||||
import ca.uhn.fhir.batch2.model.JobInstance;
|
import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.batch2.model.StatusEnum;
|
import ca.uhn.fhir.batch2.model.StatusEnum;
|
||||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
|
||||||
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
|
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
|
||||||
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
|
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
|
||||||
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
|
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
|
||||||
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
|
|
||||||
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
|
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
|
||||||
import ca.uhn.fhir.model.api.PagingIterator;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.InjectMocks;
|
import org.mockito.InjectMocks;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import org.springframework.data.domain.PageRequest;
|
|
||||||
import org.springframework.data.domain.Pageable;
|
import org.springframework.data.domain.Pageable;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.never;
|
|
||||||
import static org.mockito.Mockito.reset;
|
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -192,66 +183,6 @@ class JpaJobPersistenceImplTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void fetchAllWorkChunksIterator_withValidIdAndBoolToSayToIncludeData_returnsPagingIterator() {
|
|
||||||
// setup
|
|
||||||
String instanceId = "instanceId";
|
|
||||||
String jobDefinition = "definitionId";
|
|
||||||
int version = 1;
|
|
||||||
String targetStep = "step";
|
|
||||||
|
|
||||||
List<Batch2WorkChunkEntity> workChunkEntityList = new ArrayList<>();
|
|
||||||
Batch2WorkChunkEntity chunk1 = new Batch2WorkChunkEntity();
|
|
||||||
chunk1.setId("id1");
|
|
||||||
chunk1.setJobDefinitionVersion(version);
|
|
||||||
chunk1.setJobDefinitionId(jobDefinition);
|
|
||||||
chunk1.setSerializedData("serialized data 1");
|
|
||||||
chunk1.setTargetStepId(targetStep);
|
|
||||||
workChunkEntityList.add(chunk1);
|
|
||||||
Batch2WorkChunkEntity chunk2 = new Batch2WorkChunkEntity();
|
|
||||||
chunk2.setId("id2");
|
|
||||||
chunk2.setSerializedData("serialized data 2");
|
|
||||||
chunk2.setJobDefinitionId(jobDefinition);
|
|
||||||
chunk2.setJobDefinitionVersion(version);
|
|
||||||
chunk2.setTargetStepId(targetStep);
|
|
||||||
workChunkEntityList.add(chunk2);
|
|
||||||
|
|
||||||
for (boolean includeData : new boolean[] { true , false }) {
|
|
||||||
// when
|
|
||||||
when(myWorkChunkRepository.fetchChunks(any(PageRequest.class), eq(instanceId)))
|
|
||||||
.thenReturn(workChunkEntityList);
|
|
||||||
|
|
||||||
// test
|
|
||||||
Iterator<WorkChunk> chunkIterator = mySvc.fetchAllWorkChunksIterator(instanceId, includeData);
|
|
||||||
|
|
||||||
// verify
|
|
||||||
assertTrue(chunkIterator instanceof PagingIterator);
|
|
||||||
verify(myWorkChunkRepository, never())
|
|
||||||
.fetchChunks(any(PageRequest.class), anyString());
|
|
||||||
|
|
||||||
// now try the iterator out...
|
|
||||||
WorkChunk chunk = chunkIterator.next();
|
|
||||||
assertEquals(chunk1.getId(), chunk.getId());
|
|
||||||
if (includeData) {
|
|
||||||
assertEquals(chunk1.getSerializedData(), chunk.getData());
|
|
||||||
} else {
|
|
||||||
assertNull(chunk.getData());
|
|
||||||
}
|
|
||||||
chunk = chunkIterator.next();
|
|
||||||
assertEquals(chunk2.getId(), chunk.getId());
|
|
||||||
if (includeData) {
|
|
||||||
assertEquals(chunk2.getSerializedData(), chunk.getData());
|
|
||||||
} else {
|
|
||||||
assertNull(chunk.getData());
|
|
||||||
}
|
|
||||||
|
|
||||||
verify(myWorkChunkRepository)
|
|
||||||
.fetchChunks(any(PageRequest.class), eq(instanceId));
|
|
||||||
|
|
||||||
reset(myWorkChunkRepository);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void fetchInstances_validRequest_returnsFoundInstances() {
|
public void fetchInstances_validRequest_returnsFoundInstances() {
|
||||||
// setup
|
// setup
|
||||||
|
|
|
@ -14,6 +14,7 @@ import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
|
||||||
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
|
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
|
||||||
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
||||||
import ca.uhn.fhir.util.JsonUtil;
|
import ca.uhn.fhir.util.JsonUtil;
|
||||||
|
import com.google.common.collect.Iterators;
|
||||||
import org.junit.jupiter.api.MethodOrderer;
|
import org.junit.jupiter.api.MethodOrderer;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.TestMethodOrder;
|
import org.junit.jupiter.api.TestMethodOrder;
|
||||||
|
@ -358,6 +359,76 @@ public class JpaJobPersistenceImplTest extends BaseJpaR4Test {
|
||||||
assertNull(chunk.getData());
|
assertNull(chunk.getData());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testStoreAndFetchChunksForInstance_NoData() {
|
||||||
|
//wipmb here
|
||||||
|
// given
|
||||||
|
JobInstance instance = createInstance();
|
||||||
|
String instanceId = mySvc.storeNewInstance(instance);
|
||||||
|
|
||||||
|
String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data");
|
||||||
|
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data");
|
||||||
|
String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data");
|
||||||
|
|
||||||
|
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(erroredId);
|
||||||
|
MarkWorkChunkAsErrorRequest parameters = new MarkWorkChunkAsErrorRequest();
|
||||||
|
parameters.setChunkId(erroredId);
|
||||||
|
parameters.setErrorMsg("Our error message");
|
||||||
|
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(parameters);
|
||||||
|
|
||||||
|
mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(completedId);
|
||||||
|
mySvc.markWorkChunkAsCompletedAndClearData(instanceId, completedId, 11);
|
||||||
|
|
||||||
|
// when
|
||||||
|
Iterator<WorkChunk> workChunks = mySvc.fetchAllWorkChunksIterator(instanceId, false);
|
||||||
|
|
||||||
|
// then
|
||||||
|
ArrayList<WorkChunk> chunks = new ArrayList<>();
|
||||||
|
Iterators.addAll(chunks, workChunks);
|
||||||
|
assertEquals(3, chunks.size());
|
||||||
|
|
||||||
|
{
|
||||||
|
WorkChunk workChunk = chunks.get(0);
|
||||||
|
assertNull(workChunk.getData(), "we skip the data");
|
||||||
|
assertEquals(queuedId, workChunk.getId());
|
||||||
|
assertEquals(JOB_DEFINITION_ID, workChunk.getJobDefinitionId());
|
||||||
|
assertEquals(JOB_DEF_VER, workChunk.getJobDefinitionVersion());
|
||||||
|
assertEquals(instanceId, workChunk.getInstanceId());
|
||||||
|
assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId());
|
||||||
|
assertEquals(0, workChunk.getSequence());
|
||||||
|
assertEquals(StatusEnum.QUEUED, workChunk.getStatus());
|
||||||
|
|
||||||
|
|
||||||
|
assertNotNull(workChunk.getCreateTime());
|
||||||
|
assertNotNull(workChunk.getStartTime());
|
||||||
|
assertNotNull(workChunk.getUpdateTime());
|
||||||
|
assertNull(workChunk.getEndTime());
|
||||||
|
assertNull(workChunk.getErrorMessage());
|
||||||
|
assertEquals(0, workChunk.getErrorCount());
|
||||||
|
assertEquals(null, workChunk.getRecordsProcessed());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
WorkChunk workChunk1 = chunks.get(1);
|
||||||
|
assertEquals(StatusEnum.ERRORED, workChunk1.getStatus());
|
||||||
|
assertEquals("Our error message", workChunk1.getErrorMessage());
|
||||||
|
assertEquals(1, workChunk1.getErrorCount());
|
||||||
|
assertEquals(null, workChunk1.getRecordsProcessed());
|
||||||
|
assertNotNull(workChunk1.getEndTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
WorkChunk workChunk2 = chunks.get(2);
|
||||||
|
assertEquals(StatusEnum.COMPLETED, workChunk2.getStatus());
|
||||||
|
assertNotNull(workChunk2.getEndTime());
|
||||||
|
assertEquals(11, workChunk2.getRecordsProcessed());
|
||||||
|
assertNull(workChunk2.getErrorMessage());
|
||||||
|
assertEquals(0, workChunk2.getErrorCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStoreAndFetchWorkChunk_WithData() {
|
public void testStoreAndFetchWorkChunk_WithData() {
|
||||||
JobInstance instance = createInstance();
|
JobInstance instance = createInstance();
|
||||||
|
|
|
@ -33,6 +33,7 @@ import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
|
||||||
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
|
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
|
||||||
import ca.uhn.fhir.model.api.IModelJson;
|
import ca.uhn.fhir.model.api.IModelJson;
|
||||||
import ca.uhn.fhir.util.Logs;
|
import ca.uhn.fhir.util.Logs;
|
||||||
|
import ca.uhn.fhir.util.StopWatch;
|
||||||
import org.apache.commons.lang3.time.DateUtils;
|
import org.apache.commons.lang3.time.DateUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
@ -68,6 +69,9 @@ public class JobInstanceProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void process() {
|
public void process() {
|
||||||
|
ourLog.debug("Starting job processing: {}", myInstanceId);
|
||||||
|
StopWatch stopWatch = new StopWatch();
|
||||||
|
|
||||||
JobInstance theInstance = myJobPersistence.fetchInstance(myInstanceId).orElse(null);
|
JobInstance theInstance = myJobPersistence.fetchInstance(myInstanceId).orElse(null);
|
||||||
if (theInstance == null) {
|
if (theInstance == null) {
|
||||||
return;
|
return;
|
||||||
|
@ -76,6 +80,8 @@ public class JobInstanceProcessor {
|
||||||
handleCancellation(theInstance);
|
handleCancellation(theInstance);
|
||||||
cleanupInstance(theInstance);
|
cleanupInstance(theInstance);
|
||||||
triggerGatedExecutions(theInstance);
|
triggerGatedExecutions(theInstance);
|
||||||
|
|
||||||
|
ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleCancellation(JobInstance theInstance) {
|
private void handleCancellation(JobInstance theInstance) {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import ca.uhn.fhir.batch2.model.JobInstance;
|
||||||
import ca.uhn.fhir.batch2.model.StatusEnum;
|
import ca.uhn.fhir.batch2.model.StatusEnum;
|
||||||
import ca.uhn.fhir.batch2.model.WorkChunk;
|
import ca.uhn.fhir.batch2.model.WorkChunk;
|
||||||
import ca.uhn.fhir.util.Logs;
|
import ca.uhn.fhir.util.Logs;
|
||||||
|
import ca.uhn.fhir.util.StopWatch;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
|
@ -46,6 +47,9 @@ public class JobInstanceProgressCalculator {
|
||||||
|
|
||||||
public void calculateAndStoreInstanceProgress(JobInstance theInstance) {
|
public void calculateAndStoreInstanceProgress(JobInstance theInstance) {
|
||||||
String instanceId = theInstance.getInstanceId();
|
String instanceId = theInstance.getInstanceId();
|
||||||
|
StopWatch stopWatch = new StopWatch();
|
||||||
|
ourLog.trace("calculating progress: {}", instanceId);
|
||||||
|
|
||||||
|
|
||||||
InstanceProgress instanceProgress = calculateInstanceProgress(instanceId);
|
InstanceProgress instanceProgress = calculateInstanceProgress(instanceId);
|
||||||
|
|
||||||
|
@ -76,11 +80,13 @@ public class JobInstanceProgressCalculator {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
ourLog.trace("calculating progress: {} - complete in {}", instanceId, stopWatch);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
private InstanceProgress calculateInstanceProgress(String instanceId) {
|
private InstanceProgress calculateInstanceProgress(String instanceId) {
|
||||||
InstanceProgress instanceProgress = new InstanceProgress();
|
InstanceProgress instanceProgress = new InstanceProgress();
|
||||||
|
// wipmb mb here
|
||||||
Iterator<WorkChunk> workChunkIterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, false);
|
Iterator<WorkChunk> workChunkIterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, false);
|
||||||
|
|
||||||
while (workChunkIterator.hasNext()) {
|
while (workChunkIterator.hasNext()) {
|
||||||
|
|
Loading…
Reference in New Issue