Process bundle batch in parallel (#2905)

* Process batch in parallel

* Added a test case

* Added more test cases

* Default the bundle batch to single thread.

* Tried transaction before multi-thread.

* Updated the test cases

* Update based on review comments and add changelog

* Updated the changelog

* Restore QueueCapacity
This commit is contained in:
Frank Tao 2021-08-20 21:14:08 -04:00 committed by GitHub
parent bb0552d0db
commit 4c2ae513d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 379 additions and 51 deletions

View File

@ -0,0 +1,5 @@
---
type: add
issue: 2836
jira: SMILE-2197
title: "FHIR bundle batch is now processed in parallel by default and is configurable by DaoConfig."

View File

@ -258,6 +258,17 @@ public class DaoConfig {
private boolean myAccountForDateIndexNulls;
private boolean myTriggerSubscriptionsForNonVersioningChanges;
/**
* @since 5.6.0
*/
// Thread Pool size used by batch in bundle
public static final int DEFAULT_BUNDLE_BATCH_POOL_SIZE = 20; // 1 for single thread
public static final int DEFAULT_BUNDLE_BATCH_MAX_POOL_SIZE = 100; // 1 for single thread
public static final int DEFAULT_BUNDLE_BATCH_QUEUE_CAPACITY = 200;
private Integer myBundleBatchPoolSize = DEFAULT_BUNDLE_BATCH_POOL_SIZE;
private Integer myBundleBatchMaxPoolSize = DEFAULT_BUNDLE_BATCH_MAX_POOL_SIZE;
/**
* Constructor
*/
@ -2570,6 +2581,44 @@ public class DaoConfig {
myTriggerSubscriptionsForNonVersioningChanges = theTriggerSubscriptionsForNonVersioningChanges;
}
/**
* Get the batch transaction thread pool size.
*
* @since 5.6.0
*/
public Integer getBundleBatchPoolSize() {
return myBundleBatchPoolSize;
}
/**
* Set the batch transaction thread pool size. The default is @see {@link #DEFAULT_BUNDLE_BATCH_POOL_SIZE}
* set pool size to 1 for single thread
*
* @since 5.6.0
*/
public void setBundleBatchPoolSize(Integer theBundleBatchPoolSize) {
this.myBundleBatchPoolSize = theBundleBatchPoolSize;
}
/**
* Get the batch transaction thread max pool size.
* set max pool size to 1 for single thread
*
* @since 5.6.0
*/
public Integer getBundleBatchMaxPoolSize() {
return myBundleBatchMaxPoolSize;
}
/**
* Set the batch transaction thread pool size. The default is @see {@link #DEFAULT_BUNDLE_BATCH_MAX_POOL_SIZE}
*
* @since 5.6.0
*/
public void setBundleBatchMaxPoolSize(Integer theBundleBatchMaxPoolSize) {
this.myBundleBatchMaxPoolSize = theBundleBatchMaxPoolSize;
}
public boolean canDeleteExpunge() {
return isAllowMultipleDelete() && isExpungeEnabled() && isDeleteExpungeEnabled();
}

View File

@ -72,6 +72,7 @@ import ca.uhn.fhir.util.ElementUtil;
import ca.uhn.fhir.util.FhirTerser;
import ca.uhn.fhir.util.ResourceReferenceInfo;
import ca.uhn.fhir.util.StopWatch;
import ca.uhn.fhir.util.AsyncUtil;
import ca.uhn.fhir.util.UrlUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
@ -92,6 +93,7 @@ import org.hl7.fhir.instance.model.api.IPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionCallback;
@ -112,6 +114,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static ca.uhn.fhir.util.StringUtil.toUtf8String;
@ -146,6 +152,8 @@ public abstract class BaseTransactionProcessor {
@Autowired
private InMemoryResourceMatcher myInMemoryResourceMatcher;
private ThreadPoolTaskExecutor myExecutor ;
@VisibleForTesting
public void setDaoConfig(DaoConfig theDaoConfig) {
myDaoConfig = theDaoConfig;
@ -163,6 +171,16 @@ public abstract class BaseTransactionProcessor {
@PostConstruct
public void start() {
ourLog.trace("Starting transaction processor");
myExecutor = new ThreadPoolTaskExecutor();
myExecutor.setThreadNamePrefix("bundle_batch_");
// For single thread set the value to 1
//myExecutor.setCorePoolSize(1);
//myExecutor.setMaxPoolSize(1);
myExecutor.setCorePoolSize(myDaoConfig.getBundleBatchPoolSize());
myExecutor.setMaxPoolSize(myDaoConfig.getBundleBatchMaxPoolSize());
myExecutor.setQueueCapacity(DaoConfig.DEFAULT_BUNDLE_BATCH_QUEUE_CAPACITY);
myExecutor.initialize();
}
public <BUNDLE extends IBaseBundle> BUNDLE transaction(RequestDetails theRequestDetails, BUNDLE theRequest, boolean theNestedMode) {
@ -309,59 +327,54 @@ public abstract class BaseTransactionProcessor {
private IBaseBundle batch(final RequestDetails theRequestDetails, IBaseBundle theRequest, boolean theNestedMode) {
ourLog.info("Beginning batch with {} resources", myVersionAdapter.getEntries(theRequest).size());
long start = System.currentTimeMillis();
TransactionTemplate txTemplate = new TransactionTemplate(myTxManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
IBaseBundle resp = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.BATCHRESPONSE.toCode());
IBaseBundle response = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.BATCHRESPONSE.toCode());
Map<Integer, Object> responseMap = new ConcurrentHashMap<>();
/*
* For batch, we handle each entry as a mini-transaction in its own database transaction so that if one fails, it doesn't prevent others
*/
List<IBase> requestEntries = myVersionAdapter.getEntries(theRequest);
int requestEntriesSize = requestEntries.size();
for (final Object nextRequestEntry : myVersionAdapter.getEntries(theRequest)) {
// And execute for each entry in parallel as a mini-transaction in its
// own database transaction so that if one fails, it doesn't prevent others.
// The result is keep in the map to save the original position
BaseServerResponseExceptionHolder caughtEx = new BaseServerResponseExceptionHolder();
CountDownLatch completionLatch = new CountDownLatch(requestEntriesSize);
IBase nextRequestEntry = null;
for (int i=0; i<requestEntriesSize; i++ ) {
nextRequestEntry = requestEntries.get(i);
BundleTask bundleTask = new BundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode);
myExecutor.submit(bundleTask);
}
try {
IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode());
myVersionAdapter.addEntry(subRequestBundle, (IBase) nextRequestEntry);
// waiting for all tasks to be completed
AsyncUtil.awaitLatchAndIgnoreInterrupt(completionLatch, 300L, TimeUnit.SECONDS);
IBaseBundle nextResponseBundle = processTransactionAsSubRequest(theRequestDetails, subRequestBundle, "Batch sub-request", theNestedMode);
// Now, create the bundle response in original order
Object nextResponseEntry;
for (int i=0; i<requestEntriesSize; i++ ) {
IBase subResponseEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0);
myVersionAdapter.addEntry(resp, subResponseEntry);
/*
* If the individual entry didn't have a resource in its response, bring the sub-transaction's OperationOutcome across so the client can see it
*/
if (myVersionAdapter.getResource(subResponseEntry) == null) {
IBase nextResponseBundleFirstEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0);
myVersionAdapter.setResource(subResponseEntry, myVersionAdapter.getResource(nextResponseBundleFirstEntry));
nextResponseEntry = responseMap.get(i);
if (nextResponseEntry instanceof BaseServerResponseExceptionHolder) {
BaseServerResponseExceptionHolder caughtEx = (BaseServerResponseExceptionHolder)nextResponseEntry;
if (caughtEx.getException() != null) {
IBase nextEntry = myVersionAdapter.addEntry(response);
populateEntryWithOperationOutcome(caughtEx.getException(), nextEntry);
myVersionAdapter.setResponseStatus(nextEntry, toStatusString(caughtEx.getException().getStatusCode()));
}
} catch (BaseServerResponseException e) {
caughtEx.setException(e);
} catch (Throwable t) {
ourLog.error("Failure during BATCH sub transaction processing", t);
caughtEx.setException(new InternalErrorException(t));
} else {
myVersionAdapter.addEntry(response, (IBase)nextResponseEntry);
}
if (caughtEx.getException() != null) {
IBase nextEntry = myVersionAdapter.addEntry(resp);
populateEntryWithOperationOutcome(caughtEx.getException(), nextEntry);
myVersionAdapter.setResponseStatus(nextEntry, toStatusString(caughtEx.getException().getStatusCode()));
}
}
long delay = System.currentTimeMillis() - start;
ourLog.info("Batch completed in {}ms", delay);
return resp;
return response;
}
@VisibleForTesting
@ -1544,5 +1557,62 @@ public abstract class BaseTransactionProcessor {
return theStatusCode + " " + defaultString(Constants.HTTP_STATUS_NAMES.get(theStatusCode));
}
public class BundleTask implements Callable<Void> {
private CountDownLatch myCompletedLatch;
private ServletRequestDetails myRequestDetails;
private IBase myNextReqEntry;
private Map<Integer, Object> myResponseMap;
private int myResponseOrder;
private boolean myNestedMode;
protected BundleTask(CountDownLatch theCompletedLatch, RequestDetails theRequestDetails, Map<Integer, Object> theResponseMap, int theResponseOrder, IBase theNextReqEntry, boolean theNestedMode) {
this.myCompletedLatch = theCompletedLatch;
this.myRequestDetails = (ServletRequestDetails)theRequestDetails;
this.myNextReqEntry = theNextReqEntry;
this.myResponseMap = theResponseMap;
this.myResponseOrder = theResponseOrder;
this.myNestedMode = theNestedMode;
}
@Override
public Void call() {
BaseServerResponseExceptionHolder caughtEx = new BaseServerResponseExceptionHolder();
try {
IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode());
myVersionAdapter.addEntry(subRequestBundle, (IBase) myNextReqEntry);
IBaseBundle nextResponseBundle = processTransactionAsSubRequest(myRequestDetails, subRequestBundle, "Batch sub-request", myNestedMode);
IBase subResponseEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0);
myResponseMap.put(myResponseOrder, subResponseEntry);
/*
* If the individual entry didn't have a resource in its response, bring the sub-transaction's OperationOutcome across so the client can see it
*/
if (myVersionAdapter.getResource(subResponseEntry) == null) {
IBase nextResponseBundleFirstEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0);
myResponseMap.put(myResponseOrder, nextResponseBundleFirstEntry);
}
} catch (BaseServerResponseException e) {
caughtEx.setException(e);
} catch (Throwable t) {
ourLog.error("Failure during BATCH sub transaction processing", t);
caughtEx.setException(new InternalErrorException(t));
}
if (caughtEx.getException() != null) {
// add exception to the response map
myResponseMap.put(myResponseOrder, caughtEx);
}
// checking for the parallelism
ourLog.debug("processing bacth for {} is completed", myVersionAdapter.getEntryRequestUrl((IBase)myNextReqEntry));
myCompletedLatch.countDown();
return null;
}
}
}

View File

@ -1,22 +1,50 @@
package ca.uhn.fhir.jpa.provider.r4;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.server.exceptions.NotImplementedOperationException;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
import java.util.List;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.hl7.fhir.r4.model.Bundle.HTTPVerb;
import org.hl7.fhir.r4.model.Condition;
import org.hl7.fhir.r4.model.Enumerations.AdministrativeGender;
import org.hl7.fhir.r4.model.OperationOutcome;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Patient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import ca.uhn.fhir.jpa.api.config.DaoConfig;
import ca.uhn.fhir.jpa.model.util.JpaConstants;
import ca.uhn.fhir.rest.server.exceptions.NotImplementedOperationException;
public class ResourceProviderR4BundleTest extends BaseResourceProviderR4Test {
private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(ResourceProviderR4BundleTest.class);
@BeforeEach
@Override
public void before() throws Exception {
super.before();
myDaoConfig.setBundleBatchPoolSize(20);
myDaoConfig.setBundleBatchMaxPoolSize(100);
}
@AfterEach
@Override
public void after() throws Exception {
super.after();
myDaoConfig.setBundleBatchPoolSize(DaoConfig.DEFAULT_BUNDLE_BATCH_POOL_SIZE);
myDaoConfig.setBundleBatchMaxPoolSize(DaoConfig.DEFAULT_BUNDLE_BATCH_MAX_POOL_SIZE);
}
/**
* See #401
*/
@ -58,5 +86,181 @@ public class ResourceProviderR4BundleTest extends BaseResourceProviderR4Test {
}
@Test
public void testBundleBatch() {
List<String> ids = createPatients(50);
Bundle input = new Bundle();
input.setType(BundleType.BATCH);
for (String id : ids)
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(id);
Bundle output = myClient.transaction().withBundle(input).execute();
//ourLog.info("Bundle: \n" + myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(output));
assertEquals(50, output.getEntry().size());
List<BundleEntryComponent> bundleEntries = output.getEntry();
int i=0;
for (BundleEntryComponent bundleEntry : bundleEntries) {
assertEquals(ids.get(i++), bundleEntry.getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
}
}
@Test
public void testBundleBatchWithSingleThread() {
List<String> ids = createPatients(50);
myDaoConfig.setBundleBatchPoolSize(1);
myDaoConfig.setBundleBatchMaxPoolSize(1);
Bundle input = new Bundle();
input.setType(BundleType.BATCH);
for (String id : ids)
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(id);
Bundle output = myClient.transaction().withBundle(input).execute();
//ourLog.info("Bundle: \n" + myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(output));
assertEquals(50, output.getEntry().size());
List<BundleEntryComponent> bundleEntries = output.getEntry();
int i=0;
for (BundleEntryComponent bundleEntry : bundleEntries) {
assertEquals(ids.get(i++), bundleEntry.getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
}
}
@Test
public void testBundleBatchWithError() {
List<String> ids = createPatients(5);
Bundle input = new Bundle();
input.setType(BundleType.BATCH);
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(0));
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl("Patient/1000"); // not exist
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(1));
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(2));
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl("Patient/2000"); // not exist
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(3));
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl("Patient/3000"); // not exist
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(4));
Bundle output = myClient.transaction().withBundle(input).execute();
//ourLog.info("Bundle: \n" + myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(output));
assertEquals(8, output.getEntry().size());
List<BundleEntryComponent> bundleEntries = output.getEntry();
// patient 1
assertEquals(ids.get(0), bundleEntries.get(0).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
// patient 10 - error outcomes
assertThat(((OperationOutcome)bundleEntries.get(1).getResponse().getOutcome()).getIssueFirstRep().getDiagnostics(), containsString("Patient/1000"));
// patient 2
assertEquals(ids.get(1), bundleEntries.get(2).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
// patient 3
assertEquals(ids.get(2), bundleEntries.get(3).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
// patient 20 - error outcomes
assertThat(((OperationOutcome)bundleEntries.get(4).getResponse().getOutcome()).getIssueFirstRep().getDiagnostics(), containsString("Patient/2000"));
// patient 4
assertEquals(ids.get(3), bundleEntries.get(5).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
// patient 30 - error outcomes
assertThat(((OperationOutcome)bundleEntries.get(6).getResponse().getOutcome()).getIssueFirstRep().getDiagnostics(), containsString("Patient/3000"));
// patient 5
assertEquals(ids.get(4), bundleEntries.get(7).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
}
@Test
public void testBundleBatchWithCreate() {
List<String> ids = createPatients(5);
Bundle input = new Bundle();
input.setType(BundleType.BATCH);
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(0));
Patient p = new Patient();
p.setId("100");
p.setGender(AdministrativeGender.MALE);
p.addIdentifier().setSystem("urn:foo").setValue("A");
p.addName().setFamily("Smith");
input.addEntry().setResource(p).getRequest().setMethod(HTTPVerb.POST);
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(1));
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(2));
Condition c = new Condition();
c.getSubject().setReference(ids.get(0));
input.addEntry().setResource(c).getRequest().setMethod(HTTPVerb.POST);
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(3));
input.addEntry().getRequest().setMethod(HTTPVerb.GET).setUrl(ids.get(4));
//ourLog.info("Bundle: \n" + myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(input));
Bundle output = myClient.transaction().withBundle(input).execute();
//ourLog.info("Bundle: \n" + myFhirCtx.newJsonParser().setPrettyPrint(true).encodeResourceToString(output));
assertEquals(7, output.getEntry().size());
List<BundleEntryComponent> bundleEntries = output.getEntry();
// patient 1
assertEquals(ids.get(0), bundleEntries.get(0).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
// patient create
assertThat(bundleEntries.get(1).getResponse().getStatus(), containsString("201"));
// patient 2
assertEquals(ids.get(1), bundleEntries.get(2).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
// patient 3
assertEquals(ids.get(2), bundleEntries.get(3).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
// condition create
assertThat(bundleEntries.get(4).getResponse().getStatus(), containsString("201"));
// patient 4
assertEquals(ids.get(3), bundleEntries.get(5).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
// patient 5
assertEquals(ids.get(4), bundleEntries.get(6).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
}
private List<String> createPatients(int count) {
List<String> ids = new ArrayList<String>();
for (int i = 0; i < count; i++) {
Patient patient = new Patient();
patient.setGender(AdministrativeGender.MALE);
patient.addIdentifier().setSystem("urn:foo").setValue("A");
patient.addName().setFamily("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".substring(i, i+1));
String id = myPatientDao.create(patient).getId().toUnqualifiedVersionless().getValue();
ids.add(id);
}
return ids;
}
}