remove parallelism from non-get batch operations

This commit is contained in:
Tadgh 2021-09-14 12:29:43 -04:00
parent bc93403364
commit 7dfe628551

View File

@ -205,7 +205,6 @@ public abstract class BaseTransactionProcessor {
executor.setThreadNamePrefix("bundle_batch_"); executor.setThreadNamePrefix("bundle_batch_");
executor.setCorePoolSize(myDaoConfig.getBundleBatchPoolSize()); executor.setCorePoolSize(myDaoConfig.getBundleBatchPoolSize());
executor.setMaxPoolSize(myDaoConfig.getBundleBatchMaxPoolSize()); executor.setMaxPoolSize(myDaoConfig.getBundleBatchMaxPoolSize());
executor.setQueueCapacity(DaoConfig.DEFAULT_BUNDLE_BATCH_QUEUE_CAPACITY);
executor.initialize(); executor.initialize();
myExecutor = executor; myExecutor = executor;
@ -393,38 +392,47 @@ public abstract class BaseTransactionProcessor {
List<IBase> requestEntries = myVersionAdapter.getEntries(theRequest); List<IBase> requestEntries = myVersionAdapter.getEntries(theRequest);
int requestEntriesSize = requestEntries.size(); int requestEntriesSize = requestEntries.size();
// And execute for each entry in parallel as a mini-transaction in its // Now, run all non-gets sequentially, and all gets are submitted to the executor to run (potentially) in parallel
// own database transaction so that if one fails, it doesn't prevent others. // The result is kept in the map to save the original position
// The result is keep in the map to save the original position List<RetriableBundleTask> getCalls = new ArrayList<>();
List<RetriableBundleTask> nonGetCalls = new ArrayList<>();
CountDownLatch completionLatch = new CountDownLatch(requestEntriesSize); long getEntriesSize = requestEntries.stream().filter(entry -> myVersionAdapter.getEntryRequestVerb(myContext, entry).equalsIgnoreCase("GET")).count();
IBase nextRequestEntry = null; CountDownLatch completionLatch = new CountDownLatch((int) getEntriesSize);
for (int i = 0; i < requestEntriesSize; i++) { for (int i=0; i<requestEntriesSize ; i++ ) {
nextRequestEntry = requestEntries.get(i); IBase nextRequestEntry = requestEntries.get(i);
RetriableBundleTask retriableBundleTask = new RetriableBundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode); RetriableBundleTask retriableBundleTask = new RetriableBundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode);
getTaskExecutor().execute(retriableBundleTask); if (myVersionAdapter.getEntryRequestVerb(myContext, nextRequestEntry).equalsIgnoreCase("GET")) {
getCalls.add(retriableBundleTask);
} else {
nonGetCalls.add(retriableBundleTask);
}
} }
//Execute all non-gets on calling thread.
nonGetCalls.forEach(RetriableBundleTask::run);
//Execute all gets (potentially in a pool)
getCalls.forEach(getCall -> getTaskExecutor().execute(getCall));
// waiting for all tasks to be completed // waiting for all async tasks to be completed
AsyncUtil.awaitLatchAndIgnoreInterrupt(completionLatch, 300L, TimeUnit.SECONDS); AsyncUtil.awaitLatchAndIgnoreInterrupt(completionLatch, 300L, TimeUnit.SECONDS);
// Now, create the bundle response in original order // Now, create the bundle response in original order
Object nextResponseEntry; Object nextResponseEntry;
for (int i = 0; i < requestEntriesSize; i++) { for (int i=0; i<requestEntriesSize; i++ ) {
nextResponseEntry = responseMap.get(i); nextResponseEntry = responseMap.get(i);
if (nextResponseEntry instanceof BaseServerResponseExceptionHolder) { if (nextResponseEntry instanceof BaseServerResponseExceptionHolder) {
BaseServerResponseExceptionHolder caughtEx = (BaseServerResponseExceptionHolder) nextResponseEntry; BaseServerResponseExceptionHolder caughtEx = (BaseServerResponseExceptionHolder)nextResponseEntry;
if (caughtEx.getException() != null) { if (caughtEx.getException() != null) {
IBase nextEntry = myVersionAdapter.addEntry(response); IBase nextEntry = myVersionAdapter.addEntry(response);
populateEntryWithOperationOutcome(caughtEx.getException(), nextEntry); populateEntryWithOperationOutcome(caughtEx.getException(), nextEntry);
myVersionAdapter.setResponseStatus(nextEntry, toStatusString(caughtEx.getException().getStatusCode())); myVersionAdapter.setResponseStatus(nextEntry, toStatusString(caughtEx.getException().getStatusCode()));
} }
} else { } else {
myVersionAdapter.addEntry(response, (IBase) nextResponseEntry); myVersionAdapter.addEntry(response, (IBase)nextResponseEntry);
} }
} }
long delay = System.currentTimeMillis() - start; long delay = System.currentTimeMillis() - start;
ourLog.info("Batch completed in {}ms", delay); ourLog.info("Batch completed in {}ms", delay);