Fix batching of mdm-clear tasks (#3295)

* Fix batching of mdm-clear tasks.

* Fix batching of mdm-clear tasks.

Co-authored-by: ianmarshall <ian@simpatico.ai>
This commit is contained in:
IanMMarshall 2022-01-13 17:06:27 +00:00 committed by GitHub
parent 55ad98d061
commit e6fd5c9807
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 4 deletions

View File

@ -0,0 +1,5 @@
---
type: fix
issue: 3295
title: "When $mdm-clear operation batch was split into multiple threads, ResourceVersionConflictExceptions were being
thrown. This issue has been fixed."

View File

@ -676,8 +676,8 @@ This operation takes two optional Parameters.
<td>Integer</td>
<td>0..1</td>
<td>
The number of links that should be deleted at a time. If ommitted, then the batch size will be determined by the value
of [Expunge Batch Size](/apidocs/hapi-fhir-storage/ca/uhn/fhir/jpa/api/config/DaoConfig.html#getExpungeBatchSize())
The number of links that should be deleted at a time. If omitted, then the batch size will be determined by the value
of [Reindex Batch Size](/apidocs/hapi-fhir-storage/ca/uhn/fhir/jpa/api/config/DaoConfig.html#getReindexBatchSize())
property.
</td>
</tr>

View File

@ -58,7 +58,7 @@ public class MdmLinkDeleter implements ItemProcessor<List<Long>, List<Long>> {
public List<Long> process(List<Long> thePidList) throws Exception {
ConcurrentLinkedQueue<Long> goldenPidAggregator = new ConcurrentLinkedQueue<>();
PartitionRunner partitionRunner = new PartitionRunner(PROCESS_NAME, THREAD_PREFIX, myDaoConfig.getReindexBatchSize(), myDaoConfig.getReindexThreadCount());
partitionRunner.runInPartitionedThreads(new SliceImpl<>(thePidList), pids -> removeLinks(thePidList, goldenPidAggregator));
partitionRunner.runInPartitionedThreads(new SliceImpl<>(thePidList), pids -> removeLinks(pids, goldenPidAggregator));
return new ArrayList<>(goldenPidAggregator);
}

View File

@ -95,7 +95,11 @@ public class PartitionRunner {
private List<Callable<Void>> buildCallableTasks(Slice<Long> theResourceIds, Consumer<List<Long>> partitionConsumer) {
List<Callable<Void>> retval = new ArrayList<>();
ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.getContent().size(), myBatchSize);
if (myBatchSize > theResourceIds.getContent().size()) {
ourLog.info("Splitting batch job of {} entries into chunks of {}", theResourceIds.getContent().size(), myBatchSize);
} else {
ourLog.info("Creating batch job of {} entries", theResourceIds.getContent().size());
}
List<List<Long>> partitions = Lists.partition(theResourceIds.getContent(), myBatchSize);
for (List<Long> nextPartition : partitions) {

View File

@ -0,0 +1,63 @@
package ca.uhn.fhir.jpa.batch.mdm.job;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.dao.data.IMdmLinkDao;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class MdmLinkDeleterTest {
@Mock
private DaoConfig myDaoConfig;
@Mock
private PlatformTransactionManager myPlatformTransactionManager;
@Mock
private IMdmLinkDao myMdmLinkDao;
@Captor
ArgumentCaptor<List<Long>> myPidListCaptor;
@InjectMocks
private MdmLinkDeleter myMdmLinkDeleter;
@Test
public void testMdmLinkDeleterSplitsPidList() throws Exception {
int threadCount = 4;
int batchSize = 5;
when(myDaoConfig.getReindexBatchSize()).thenReturn(batchSize);
when(myDaoConfig.getReindexThreadCount()).thenReturn(threadCount);
List<Long> allPidsList = new ArrayList<>();
int count = threadCount*batchSize;
for (long i = 0; i < count; ++i) {
allPidsList.add(i);
}
myMdmLinkDeleter.process(allPidsList);
verify(myMdmLinkDao, times(threadCount)).findAllById(myPidListCaptor.capture());
verify(myMdmLinkDao, times(threadCount)).deleteAll(anyList());
verifyNoMoreInteractions(myMdmLinkDao);
List<List<Long>> pidListList = myPidListCaptor.getAllValues();
assertEquals(threadCount, pidListList.size());
for (List<Long> pidList : pidListList) {
assertEquals(batchSize, pidList.size());
}
}
}