Merge pull request #2978 from hapifhir/fix-batch-pool-breaking
Fix bundle batch pool bug when identical tags are present.
This commit is contained in:
commit
9a1d9d091c
|
@ -0,0 +1,3 @@
|
|||
---
|
||||
type: fix
|
||||
title: "Fixed a bug where two identical tags in parallel entries being created in a batch would fail."
|
|
@ -366,8 +366,8 @@ public abstract class BaseTransactionProcessor {
|
|||
IBase nextRequestEntry = null;
|
||||
for (int i=0; i<requestEntriesSize; i++ ) {
|
||||
nextRequestEntry = requestEntries.get(i);
|
||||
BundleTask bundleTask = new BundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode);
|
||||
getTaskExecutor().execute(bundleTask);
|
||||
RetriableBundleTask retriableBundleTask = new RetriableBundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode);
|
||||
getTaskExecutor().execute(retriableBundleTask);
|
||||
}
|
||||
|
||||
// waiting for all tasks to be completed
|
||||
|
@ -1683,7 +1683,7 @@ public abstract class BaseTransactionProcessor {
|
|||
return theStatusCode + " " + defaultString(Constants.HTTP_STATUS_NAMES.get(theStatusCode));
|
||||
}
|
||||
|
||||
public class BundleTask implements Runnable {
|
||||
public class RetriableBundleTask implements Runnable {
|
||||
|
||||
private final CountDownLatch myCompletedLatch;
|
||||
private final RequestDetails myRequestDetails;
|
||||
|
@ -1691,51 +1691,73 @@ public abstract class BaseTransactionProcessor {
|
|||
private final Map<Integer, Object> myResponseMap;
|
||||
private final int myResponseOrder;
|
||||
private final boolean myNestedMode;
|
||||
private BaseServerResponseException myLastSeenException;
|
||||
|
||||
protected BundleTask(CountDownLatch theCompletedLatch, RequestDetails theRequestDetails, Map<Integer, Object> theResponseMap, int theResponseOrder, IBase theNextReqEntry, boolean theNestedMode) {
|
||||
protected RetriableBundleTask(CountDownLatch theCompletedLatch, RequestDetails theRequestDetails, Map<Integer, Object> theResponseMap, int theResponseOrder, IBase theNextReqEntry, boolean theNestedMode) {
|
||||
this.myCompletedLatch = theCompletedLatch;
|
||||
this.myRequestDetails = theRequestDetails;
|
||||
this.myNextReqEntry = theNextReqEntry;
|
||||
this.myResponseMap = theResponseMap;
|
||||
this.myResponseOrder = theResponseOrder;
|
||||
this.myNestedMode = theNestedMode;
|
||||
this.myLastSeenException = null;
|
||||
}
|
||||
|
||||
|
||||
private void processBatchEntry() {
|
||||
IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode());
|
||||
myVersionAdapter.addEntry(subRequestBundle, myNextReqEntry);
|
||||
|
||||
IBaseBundle nextResponseBundle = processTransactionAsSubRequest(myRequestDetails, subRequestBundle, "Batch sub-request", myNestedMode);
|
||||
|
||||
IBase subResponseEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0);
|
||||
myResponseMap.put(myResponseOrder, subResponseEntry);
|
||||
|
||||
/*
|
||||
* If the individual entry didn't have a resource in its response, bring the sub-transaction's OperationOutcome across so the client can see it
|
||||
*/
|
||||
if (myVersionAdapter.getResource(subResponseEntry) == null) {
|
||||
IBase nextResponseBundleFirstEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0);
|
||||
myResponseMap.put(myResponseOrder, nextResponseBundleFirstEntry);
|
||||
}
|
||||
}
|
||||
private boolean processBatchEntryWithRetry() {
|
||||
int maxAttempts =3;
|
||||
for (int attempt = 1;; attempt++) {
|
||||
try {
|
||||
processBatchEntry();
|
||||
return true;
|
||||
} catch (BaseServerResponseException e) {
|
||||
//If we catch a known and structured exception from HAPI, just fail.
|
||||
myLastSeenException = e;
|
||||
return false;
|
||||
} catch (Throwable t) {
|
||||
myLastSeenException = new InternalErrorException(t);
|
||||
//If we have caught a non-tag-storage failure we are unfamiliar with, or we have exceeded max attempts, exit.
|
||||
if (!DaoFailureUtil.isTagStorageFailure(t) || attempt >= maxAttempts) {
|
||||
ourLog.error("Failure during BATCH sub transaction processing", t);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
BaseServerResponseExceptionHolder caughtEx = new BaseServerResponseExceptionHolder();
|
||||
try {
|
||||
IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode());
|
||||
myVersionAdapter.addEntry(subRequestBundle, myNextReqEntry);
|
||||
|
||||
IBaseBundle nextResponseBundle = processTransactionAsSubRequest(myRequestDetails, subRequestBundle, "Batch sub-request", myNestedMode);
|
||||
|
||||
IBase subResponseEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0);
|
||||
myResponseMap.put(myResponseOrder, subResponseEntry);
|
||||
|
||||
/*
|
||||
* If the individual entry didn't have a resource in its response, bring the sub-transaction's OperationOutcome across so the client can see it
|
||||
*/
|
||||
if (myVersionAdapter.getResource(subResponseEntry) == null) {
|
||||
IBase nextResponseBundleFirstEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0);
|
||||
myResponseMap.put(myResponseOrder, nextResponseBundleFirstEntry);
|
||||
}
|
||||
|
||||
} catch (BaseServerResponseException e) {
|
||||
caughtEx.setException(e);
|
||||
} catch (Throwable t) {
|
||||
ourLog.error("Failure during BATCH sub transaction processing", t);
|
||||
caughtEx.setException(new InternalErrorException(t));
|
||||
boolean success = processBatchEntryWithRetry();
|
||||
if (!success) {
|
||||
populateResponseMapWithLastSeenException();
|
||||
}
|
||||
|
||||
if (caughtEx.getException() != null) {
|
||||
// add exception to the response map
|
||||
myResponseMap.put(myResponseOrder, caughtEx);
|
||||
}
|
||||
|
||||
// checking for the parallelism
|
||||
ourLog.debug("processing bacth for {} is completed", myVersionAdapter.getEntryRequestUrl(myNextReqEntry));
|
||||
ourLog.debug("processing batch for {} is completed", myVersionAdapter.getEntryRequestUrl(myNextReqEntry));
|
||||
myCompletedLatch.countDown();
|
||||
}
|
||||
|
||||
private void populateResponseMapWithLastSeenException() {
|
||||
BaseServerResponseExceptionHolder caughtEx = new BaseServerResponseExceptionHolder();
|
||||
caughtEx.setException(myLastSeenException);
|
||||
myResponseMap.put(myResponseOrder, caughtEx);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
package ca.uhn.fhir.jpa.dao;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
/**
|
||||
* Utility class to help identify classes of failure.
|
||||
*/
|
||||
public class DaoFailureUtil {
|
||||
|
||||
public static boolean isTagStorageFailure(Throwable t) {
|
||||
if (StringUtils.isBlank(t.getMessage())) {
|
||||
return false;
|
||||
} else {
|
||||
String msg = t.getMessage().toLowerCase();
|
||||
return msg.contains("hfj_tag_def") || msg.contains("hfj_res_tag");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
|
|||
import ca.uhn.fhir.interceptor.api.Pointcut;
|
||||
import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy;
|
||||
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
|
||||
import ca.uhn.fhir.jpa.dao.DaoFailureUtil;
|
||||
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
|
||||
|
@ -93,10 +94,9 @@ public class HapiTransactionService {
|
|||
* known to the system already, they'll both try to create a row in HFJ_TAG_DEF,
|
||||
* which is the tag definition table. In that case, a constraint error will be
|
||||
* thrown by one of the client threads, so we auto-retry in order to avoid
|
||||
* annopying spurious failures for the client.
|
||||
* annoying spurious failures for the client.
|
||||
*/
|
||||
if (e.getMessage().contains("HFJ_TAG_DEF") || e.getMessage().contains("hfj_tag_def") ||
|
||||
e.getMessage().contains("HFJ_RES_TAG") || e.getMessage().contains("hfj_res_tag")) {
|
||||
if (DaoFailureUtil.isTagStorageFailure(e)) {
|
||||
maxRetries = 3;
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamString;
|
|||
import ca.uhn.fhir.jpa.model.entity.ResourceTable;
|
||||
import ca.uhn.fhir.jpa.model.entity.ResourceTag;
|
||||
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
|
||||
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
|
||||
import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test;
|
||||
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
|
||||
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;
|
||||
|
|
|
@ -249,6 +249,45 @@ public class ResourceProviderR4BundleTest extends BaseResourceProviderR4Test {
|
|||
assertEquals(ids.get(4), bundleEntries.get(6).getResource().getIdElement().toUnqualifiedVersionless().getValueAsString());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTagCacheWorksWithBatchMode() {
|
||||
Bundle input = new Bundle();
|
||||
input.setType(BundleType.BATCH);
|
||||
|
||||
Patient p = new Patient();
|
||||
p.setId("100");
|
||||
p.setGender(AdministrativeGender.MALE);
|
||||
p.addIdentifier().setSystem("urn:foo").setValue("A");
|
||||
p.addName().setFamily("Smith");
|
||||
p.getMeta().addTag().setSystem("mysystem").setCode("mycode");
|
||||
input.addEntry().setResource(p).getRequest().setMethod(HTTPVerb.POST);
|
||||
|
||||
Patient p2 = new Patient();
|
||||
p2.setId("200");
|
||||
p2.setGender(AdministrativeGender.MALE);
|
||||
p2.addIdentifier().setSystem("urn:foo").setValue("A");
|
||||
p2.addName().setFamily("Smith");
|
||||
p2.getMeta().addTag().setSystem("mysystem").setCode("mycode");
|
||||
input.addEntry().setResource(p).getRequest().setMethod(HTTPVerb.POST);
|
||||
|
||||
Patient p3 = new Patient();
|
||||
p3.setId("pat-300");
|
||||
p3.setGender(AdministrativeGender.MALE);
|
||||
p3.addIdentifier().setSystem("urn:foo").setValue("A");
|
||||
p3.addName().setFamily("Smith");
|
||||
p3.getMeta().addTag().setSystem("mysystem").setCode("mycode");
|
||||
input.addEntry().setResource(p).getRequest().setMethod(HTTPVerb.PUT).setUrl("Patient/pat-300");
|
||||
|
||||
Bundle output = myClient.transaction().withBundle(input).execute();
|
||||
output.getEntry().stream()
|
||||
.map(BundleEntryComponent::getResponse)
|
||||
.map(Bundle.BundleEntryResponseComponent::getStatus)
|
||||
.forEach(statusCode -> {
|
||||
assertEquals(statusCode, "201 Created");
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private List<String> createPatients(int count) {
|
||||
List<String> ids = new ArrayList<String>();
|
||||
|
|
Loading…
Reference in New Issue