Merge branch 'master' into double-conditionalCreateIssue

This commit is contained in:
Tadgh 2021-08-31 14:25:51 -04:00
commit d75af14d29
3 changed files with 35 additions and 19 deletions

View File

@ -94,6 +94,8 @@ import org.hl7.fhir.r4.model.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@ -153,8 +155,8 @@ public abstract class BaseTransactionProcessor {
@Autowired
private InMemoryResourceMatcher myInMemoryResourceMatcher;
private ThreadPoolTaskExecutor myExecutor ;
private TaskExecutor myExecutor ;
@VisibleForTesting
public void setDaoConfig(DaoConfig theDaoConfig) {
myDaoConfig = theDaoConfig;
@ -172,16 +174,25 @@ public abstract class BaseTransactionProcessor {
@PostConstruct
public void start() {
ourLog.trace("Starting transaction processor");
myExecutor = new ThreadPoolTaskExecutor();
myExecutor.setThreadNamePrefix("bundle_batch_");
// For single thread set the value to 1
//myExecutor.setCorePoolSize(1);
//myExecutor.setMaxPoolSize(1);
myExecutor.setCorePoolSize(myDaoConfig.getBundleBatchPoolSize());
myExecutor.setMaxPoolSize(myDaoConfig.getBundleBatchMaxPoolSize());
myExecutor.setQueueCapacity(DaoConfig.DEFAULT_BUNDLE_BATCH_QUEUE_CAPACITY);
}
myExecutor.initialize();
private TaskExecutor getTaskExecutor() {
if (myExecutor == null) {
if (myDaoConfig.getBundleBatchPoolSize() > 1) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("bundle_batch_");
executor.setCorePoolSize(myDaoConfig.getBundleBatchPoolSize());
executor.setMaxPoolSize(myDaoConfig.getBundleBatchMaxPoolSize());
executor.setQueueCapacity(DaoConfig.DEFAULT_BUNDLE_BATCH_QUEUE_CAPACITY);
executor.initialize();
myExecutor = executor;
} else {
SyncTaskExecutor executor = new SyncTaskExecutor();
myExecutor = executor;
}
}
return myExecutor;
}
public <BUNDLE extends IBaseBundle> BUNDLE transaction(RequestDetails theRequestDetails, BUNDLE theRequest, boolean theNestedMode) {
@ -349,7 +360,7 @@ public abstract class BaseTransactionProcessor {
for (int i=0; i<requestEntriesSize; i++ ) {
nextRequestEntry = requestEntries.get(i);
BundleTask bundleTask = new BundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode);
myExecutor.submit(bundleTask);
getTaskExecutor().execute(bundleTask);
}
// waiting for all tasks to be completed
@ -1554,10 +1565,10 @@ public abstract class BaseTransactionProcessor {
return theStatusCode + " " + defaultString(Constants.HTTP_STATUS_NAMES.get(theStatusCode));
}
public class BundleTask implements Callable<Void> {
public class BundleTask implements Runnable {
private CountDownLatch myCompletedLatch;
private ServletRequestDetails myRequestDetails;
private RequestDetails myRequestDetails;
private IBase myNextReqEntry;
private Map<Integer, Object> myResponseMap;
private int myResponseOrder;
@ -1565,7 +1576,7 @@ public abstract class BaseTransactionProcessor {
protected BundleTask(CountDownLatch theCompletedLatch, RequestDetails theRequestDetails, Map<Integer, Object> theResponseMap, int theResponseOrder, IBase theNextReqEntry, boolean theNestedMode) {
this.myCompletedLatch = theCompletedLatch;
this.myRequestDetails = (ServletRequestDetails)theRequestDetails;
this.myRequestDetails = theRequestDetails;
this.myNextReqEntry = theNextReqEntry;
this.myResponseMap = theResponseMap;
this.myResponseOrder = theResponseOrder;
@ -1573,10 +1584,8 @@ public abstract class BaseTransactionProcessor {
}
@Override
public Void call() {
public void run() {
BaseServerResponseExceptionHolder caughtEx = new BaseServerResponseExceptionHolder();
try {
IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode());
myVersionAdapter.addEntry(subRequestBundle, (IBase) myNextReqEntry);
@ -1609,7 +1618,6 @@ public abstract class BaseTransactionProcessor {
// checking for the parallelism
ourLog.debug("processing bacth for {} is completed", myVersionAdapter.getEntryRequestUrl((IBase)myNextReqEntry));
myCompletedLatch.countDown();
return null;
}
}
}

View File

@ -116,12 +116,16 @@ public class FhirSystemDaoDstu3Test extends BaseJpaDstu3SystemTest {
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
myDaoConfig.setIndexMissingFields(new DaoConfig().getIndexMissingFields());
myDaoConfig.setMaximumDeleteConflictQueryCount(new DaoConfig().getMaximumDeleteConflictQueryCount());
myDaoConfig.setBundleBatchPoolSize(new DaoConfig().getBundleBatchPoolSize());
myDaoConfig.setBundleBatchMaxPoolSize(new DaoConfig().getBundleBatchMaxPoolSize());
}
@BeforeEach
public void beforeDisableResultReuse() {
myDaoConfig.setReuseCachedSearchResultsForMillis(null);
myDaoConfig.setBundleBatchPoolSize(1);
myDaoConfig.setBundleBatchMaxPoolSize(1);
}
private Bundle createInputTransactionWithPlaceholderIdInMatchUrl(HTTPVerb theVerb) {

View File

@ -117,12 +117,16 @@ public class FhirSystemDaoR4Test extends BaseJpaR4SystemTest {
myDaoConfig.setAllowInlineMatchUrlReferences(false);
myDaoConfig.setAllowMultipleDelete(new DaoConfig().isAllowMultipleDelete());
myModelConfig.setNormalizedQuantitySearchLevel(NormalizedQuantitySearchLevel.NORMALIZED_QUANTITY_SEARCH_NOT_SUPPORTED);
myDaoConfig.setBundleBatchPoolSize(new DaoConfig().getBundleBatchPoolSize());
myDaoConfig.setBundleBatchMaxPoolSize(new DaoConfig().getBundleBatchMaxPoolSize());
}
@BeforeEach
public void beforeDisableResultReuse() {
myInterceptorRegistry.registerInterceptor(myInterceptor);
myDaoConfig.setReuseCachedSearchResultsForMillis(null);
myDaoConfig.setBundleBatchPoolSize(1);
myDaoConfig.setBundleBatchMaxPoolSize(1);
}
private Bundle createInputTransactionWithPlaceholderIdInMatchUrl(HTTPVerb theVerb) {