Merge pull request #3038 from hapifhir/remove-parallel-non-gets
Remove parallel non gets
This commit is contained in:
commit
53a2d3c1ac
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
type: fix
|
||||
title: "Fixed a bug in processing large batch requests containing many modifying entries. PUT/POST/DELETE operations now occur sequentially,
|
||||
instead of in parallel."
|
|
@ -5,9 +5,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.hl7.fhir.instance.model.api.IIdType;
|
||||
import org.hl7.fhir.r4.model.Bundle;
|
||||
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
|
||||
|
@ -110,6 +116,22 @@ public class ResourceProviderR4BundleTest extends BaseResourceProviderR4Test {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHighConcurrencyWorks() throws IOException, InterruptedException {
|
||||
List<Bundle> bundles = new ArrayList<>();
|
||||
for (int i =0 ; i < 10; i ++) {
|
||||
bundles.add(myFhirCtx.newJsonParser().parseResource(Bundle.class, IOUtils.toString(getClass().getResourceAsStream("/r4/identical-tags-batch.json"), Charsets.UTF_8)));
|
||||
}
|
||||
|
||||
ExecutorService tpe = Executors.newFixedThreadPool(4);
|
||||
for (Bundle bundle :bundles) {
|
||||
tpe.execute(() -> myClient.transaction().withBundle(bundle).execute());
|
||||
}
|
||||
tpe.shutdown();
|
||||
tpe.awaitTermination(100, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testBundleBatchWithSingleThread() {
|
||||
List<String> ids = createPatients(50);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -205,7 +205,6 @@ public abstract class BaseTransactionProcessor {
|
|||
executor.setThreadNamePrefix("bundle_batch_");
|
||||
executor.setCorePoolSize(myDaoConfig.getBundleBatchPoolSize());
|
||||
executor.setMaxPoolSize(myDaoConfig.getBundleBatchMaxPoolSize());
|
||||
executor.setQueueCapacity(DaoConfig.DEFAULT_BUNDLE_BATCH_QUEUE_CAPACITY);
|
||||
executor.initialize();
|
||||
myExecutor = executor;
|
||||
|
||||
|
@ -393,38 +392,46 @@ public abstract class BaseTransactionProcessor {
|
|||
List<IBase> requestEntries = myVersionAdapter.getEntries(theRequest);
|
||||
int requestEntriesSize = requestEntries.size();
|
||||
|
||||
// And execute for each entry in parallel as a mini-transaction in its
|
||||
// own database transaction so that if one fails, it doesn't prevent others.
|
||||
// The result is keep in the map to save the original position
|
||||
// Now, run all non-gets sequentially, and all gets are submitted to the executor to run (potentially) in parallel
|
||||
// The result is kept in the map to save the original position
|
||||
List<RetriableBundleTask> getCalls = new ArrayList<>();
|
||||
List<RetriableBundleTask> nonGetCalls = new ArrayList<>();
|
||||
|
||||
CountDownLatch completionLatch = new CountDownLatch(requestEntriesSize);
|
||||
IBase nextRequestEntry = null;
|
||||
for (int i = 0; i < requestEntriesSize; i++) {
|
||||
nextRequestEntry = requestEntries.get(i);
|
||||
for (int i=0; i< requestEntriesSize ; i++ ) {
|
||||
IBase nextRequestEntry = requestEntries.get(i);
|
||||
RetriableBundleTask retriableBundleTask = new RetriableBundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode);
|
||||
getTaskExecutor().execute(retriableBundleTask);
|
||||
if (myVersionAdapter.getEntryRequestVerb(myContext, nextRequestEntry).equalsIgnoreCase("GET")) {
|
||||
getCalls.add(retriableBundleTask);
|
||||
} else {
|
||||
nonGetCalls.add(retriableBundleTask);
|
||||
}
|
||||
}
|
||||
//Execute all non-gets on calling thread.
|
||||
nonGetCalls.forEach(RetriableBundleTask::run);
|
||||
//Execute all gets (potentially in a pool)
|
||||
getCalls.forEach(getCall -> getTaskExecutor().execute(getCall));
|
||||
|
||||
// waiting for all tasks to be completed
|
||||
// waiting for all async tasks to be completed
|
||||
AsyncUtil.awaitLatchAndIgnoreInterrupt(completionLatch, 300L, TimeUnit.SECONDS);
|
||||
|
||||
|
||||
// Now, create the bundle response in original order
|
||||
Object nextResponseEntry;
|
||||
for (int i = 0; i < requestEntriesSize; i++) {
|
||||
|
||||
for (int i=0; i<requestEntriesSize; i++ ) {
|
||||
|
||||
nextResponseEntry = responseMap.get(i);
|
||||
if (nextResponseEntry instanceof BaseServerResponseExceptionHolder) {
|
||||
BaseServerResponseExceptionHolder caughtEx = (BaseServerResponseExceptionHolder) nextResponseEntry;
|
||||
BaseServerResponseExceptionHolder caughtEx = (BaseServerResponseExceptionHolder)nextResponseEntry;
|
||||
if (caughtEx.getException() != null) {
|
||||
IBase nextEntry = myVersionAdapter.addEntry(response);
|
||||
populateEntryWithOperationOutcome(caughtEx.getException(), nextEntry);
|
||||
myVersionAdapter.setResponseStatus(nextEntry, toStatusString(caughtEx.getException().getStatusCode()));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
myVersionAdapter.addEntry(response, (IBase) nextResponseEntry);
|
||||
myVersionAdapter.addEntry(response, (IBase)nextResponseEntry);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
long delay = System.currentTimeMillis() - start;
|
||||
ourLog.info("Batch completed in {}ms", delay);
|
||||
|
||||
|
|
Loading…
Reference in New Issue