Swap to SyncTaskExecutor

This commit is contained in:
Tadgh 2021-08-31 11:33:06 -04:00
parent faf4dfd056
commit c9f5dfd6d3
1 changed files with 18 additions and 18 deletions

View File

@ -94,6 +94,8 @@ import org.hl7.fhir.r4.model.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@ -153,7 +155,7 @@ public abstract class BaseTransactionProcessor {
@Autowired
private InMemoryResourceMatcher myInMemoryResourceMatcher;
private ThreadPoolTaskExecutor myExecutor ;
private TaskExecutor myExecutor ;
@VisibleForTesting
public void setDaoConfig(DaoConfig theDaoConfig) {
@ -172,13 +174,19 @@ public abstract class BaseTransactionProcessor {
@PostConstruct
public void start() {
ourLog.trace("Starting transaction processor");
myExecutor = new ThreadPoolTaskExecutor();
myExecutor.setThreadNamePrefix("bundle_batch_");
myExecutor.setCorePoolSize(myDaoConfig.getBundleBatchPoolSize());
myExecutor.setMaxPoolSize(myDaoConfig.getBundleBatchMaxPoolSize());
myExecutor.setQueueCapacity(DaoConfig.DEFAULT_BUNDLE_BATCH_QUEUE_CAPACITY);
if (myDaoConfig.getBundleBatchPoolSize() > 1) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("bundle_batch_");
executor.setCorePoolSize(myDaoConfig.getBundleBatchPoolSize());
executor.setMaxPoolSize(myDaoConfig.getBundleBatchMaxPoolSize());
executor.setQueueCapacity(DaoConfig.DEFAULT_BUNDLE_BATCH_QUEUE_CAPACITY);
executor.initialize();
myExecutor = executor;
myExecutor.initialize();
} else {
SyncTaskExecutor executor = new SyncTaskExecutor();
myExecutor = executor;
}
}
public <BUNDLE extends IBaseBundle> BUNDLE transaction(RequestDetails theRequestDetails, BUNDLE theRequest, boolean theNestedMode) {
@ -346,12 +354,7 @@ public abstract class BaseTransactionProcessor {
for (int i=0; i<requestEntriesSize; i++ ) {
nextRequestEntry = requestEntries.get(i);
BundleTask bundleTask = new BundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode);
//Don't spin up a new thread for nothing if batch size is <=1
if (myDaoConfig.getBundleBatchPoolSize() <= 1) {
bundleTask.call();
} else {
myExecutor.submit(bundleTask);
}
myExecutor.execute(bundleTask);
}
// waiting for all tasks to be completed
@ -1556,7 +1559,7 @@ public abstract class BaseTransactionProcessor {
return theStatusCode + " " + defaultString(Constants.HTTP_STATUS_NAMES.get(theStatusCode));
}
public class BundleTask implements Callable<Void> {
public class BundleTask implements Runnable {
private CountDownLatch myCompletedLatch;
private RequestDetails myRequestDetails;
@ -1575,10 +1578,8 @@ public abstract class BaseTransactionProcessor {
}
@Override
public Void call() {
public void run() {
BaseServerResponseExceptionHolder caughtEx = new BaseServerResponseExceptionHolder();
try {
IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode());
myVersionAdapter.addEntry(subRequestBundle, (IBase) myNextReqEntry);
@ -1611,7 +1612,6 @@ public abstract class BaseTransactionProcessor {
// checking for the parallelism
ourLog.debug("processing bacth for {} is completed", myVersionAdapter.getEntryRequestUrl((IBase)myNextReqEntry));
myCompletedLatch.countDown();
return null;
}
}
}