Add failing test, implementation

This commit is contained in:
Tadgh 2021-09-09 10:50:54 -04:00
parent 0ccd71faf8
commit 41d5e2b8e7
5 changed files with 120 additions and 38 deletions

View File

@ -80,6 +80,7 @@ import ca.uhn.fhir.util.UrlUtil;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap; import com.google.common.collect.ListMultimap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.Validate;
import org.hl7.fhir.dstu3.model.Bundle; import org.hl7.fhir.dstu3.model.Bundle;
import org.hl7.fhir.exceptions.FHIRException; import org.hl7.fhir.exceptions.FHIRException;
@ -115,6 +116,7 @@ import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -366,8 +368,8 @@ public abstract class BaseTransactionProcessor {
IBase nextRequestEntry = null; IBase nextRequestEntry = null;
for (int i=0; i<requestEntriesSize; i++ ) { for (int i=0; i<requestEntriesSize; i++ ) {
nextRequestEntry = requestEntries.get(i); nextRequestEntry = requestEntries.get(i);
BundleTask bundleTask = new BundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode); RetriableBundleTask retriableBundleTask = new RetriableBundleTask(completionLatch, theRequestDetails, responseMap, i, nextRequestEntry, theNestedMode);
getTaskExecutor().execute(bundleTask); getTaskExecutor().execute(retriableBundleTask);
} }
// waiting for all tasks to be completed // waiting for all tasks to be completed
@ -1684,7 +1686,7 @@ public abstract class BaseTransactionProcessor {
return theStatusCode + " " + defaultString(Constants.HTTP_STATUS_NAMES.get(theStatusCode)); return theStatusCode + " " + defaultString(Constants.HTTP_STATUS_NAMES.get(theStatusCode));
} }
public class BundleTask implements Runnable { public class RetriableBundleTask implements Runnable {
private CountDownLatch myCompletedLatch; private CountDownLatch myCompletedLatch;
private RequestDetails myRequestDetails; private RequestDetails myRequestDetails;
@ -1692,20 +1694,19 @@ public abstract class BaseTransactionProcessor {
private Map<Integer, Object> myResponseMap; private Map<Integer, Object> myResponseMap;
private int myResponseOrder; private int myResponseOrder;
private boolean myNestedMode; private boolean myNestedMode;
private Throwable myLastSeenException;
protected BundleTask(CountDownLatch theCompletedLatch, RequestDetails theRequestDetails, Map<Integer, Object> theResponseMap, int theResponseOrder, IBase theNextReqEntry, boolean theNestedMode) { protected RetriableBundleTask(CountDownLatch theCompletedLatch, RequestDetails theRequestDetails, Map<Integer, Object> theResponseMap, int theResponseOrder, IBase theNextReqEntry, boolean theNestedMode) {
this.myCompletedLatch = theCompletedLatch; this.myCompletedLatch = theCompletedLatch;
this.myRequestDetails = theRequestDetails; this.myRequestDetails = theRequestDetails;
this.myNextReqEntry = theNextReqEntry; this.myNextReqEntry = theNextReqEntry;
this.myResponseMap = theResponseMap; this.myResponseMap = theResponseMap;
this.myResponseOrder = theResponseOrder; this.myResponseOrder = theResponseOrder;
this.myNestedMode = theNestedMode; this.myNestedMode = theNestedMode;
this.myLastSeenException = null;
} }
@Override private void processBatchEntry() {
public void run() {
BaseServerResponseExceptionHolder caughtEx = new BaseServerResponseExceptionHolder();
try {
IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode()); IBaseBundle subRequestBundle = myVersionAdapter.createBundle(org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION.toCode());
myVersionAdapter.addEntry(subRequestBundle, (IBase) myNextReqEntry); myVersionAdapter.addEntry(subRequestBundle, (IBase) myNextReqEntry);
@ -1721,22 +1722,45 @@ public abstract class BaseTransactionProcessor {
IBase nextResponseBundleFirstEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0); IBase nextResponseBundleFirstEntry = (IBase) myVersionAdapter.getEntries(nextResponseBundle).get(0);
myResponseMap.put(myResponseOrder, nextResponseBundleFirstEntry); myResponseMap.put(myResponseOrder, nextResponseBundleFirstEntry);
} }
}
private boolean processBatchEntryWithRetry() {
int maxAttempts =3;
for (int attempt = 1;; attempt++) {
try {
processBatchEntry();
return true;
} catch (BaseServerResponseException e) { } catch (BaseServerResponseException e) {
caughtEx.setException(e); //If we catch a known and structured exception from HAPI, just fail.
myLastSeenException = e;
return false;
} catch (Throwable t) { } catch (Throwable t) {
myLastSeenException = t;
//If we have caught a non-tag-storage failure we are unfamiliar with, or we have exceeded max attempts, exit.
if (!DaoFailureUtil.isTagStorageFailure(t) || attempt >= maxAttempts) {
ourLog.error("Failure during BATCH sub transaction processing", t); ourLog.error("Failure during BATCH sub transaction processing", t);
caughtEx.setException(new InternalErrorException(t)); return false;
}
}
}
} }
if (caughtEx.getException() != null) { @Override
// add exception to the response map public void run() {
myResponseMap.put(myResponseOrder, caughtEx); boolean success = processBatchEntryWithRetry();
if (!success) {
populateResponseMapWithLastSeenException();
} }
// checking for the parallelism // checking for the parallelism
ourLog.debug("processing bacth for {} is completed", myVersionAdapter.getEntryRequestUrl((IBase)myNextReqEntry)); ourLog.debug("Processing batch entry for {} is completed", myVersionAdapter.getEntryRequestUrl((IBase)myNextReqEntry));
myCompletedLatch.countDown(); myCompletedLatch.countDown();
} }
private void populateResponseMapWithLastSeenException() {
BaseServerResponseExceptionHolder caughtEx = new BaseServerResponseExceptionHolder();
caughtEx.setException(new InternalErrorException(myLastSeenException));
myResponseMap.put(myResponseOrder, caughtEx);
}
} }
} }

View File

@ -0,0 +1,18 @@
package ca.uhn.fhir.jpa.dao;
import org.apache.commons.lang3.StringUtils;
/**
* Utility class to help identify classes of failure.
*/
public class DaoFailureUtil {
public static boolean isTagStorageFailure(Throwable t) {
if (StringUtils.isBlank(t.getMessage())) {
return false;
} else {
String msg = t.getMessage().toLowerCase();
return msg.contains("hfj_tag_def") || msg.contains("hfj_res_tag");
}
}
}

View File

@ -25,6 +25,7 @@ import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
import ca.uhn.fhir.interceptor.api.Pointcut; import ca.uhn.fhir.interceptor.api.Pointcut;
import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy; import ca.uhn.fhir.jpa.api.model.ResourceVersionConflictResolutionStrategy;
import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao; import ca.uhn.fhir.jpa.dao.BaseHapiFhirDao;
import ca.uhn.fhir.jpa.dao.DaoFailureUtil;
import ca.uhn.fhir.rest.api.server.RequestDetails; import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
@ -93,10 +94,9 @@ public class HapiTransactionService {
* known to the system already, they'll both try to create a row in HFJ_TAG_DEF, * known to the system already, they'll both try to create a row in HFJ_TAG_DEF,
* which is the tag definition table. In that case, a constraint error will be * which is the tag definition table. In that case, a constraint error will be
* thrown by one of the client threads, so we auto-retry in order to avoid * thrown by one of the client threads, so we auto-retry in order to avoid
* annopying spurious failures for the client. * annoying spurious failures for the client.
*/ */
if (e.getMessage().contains("HFJ_TAG_DEF") || e.getMessage().contains("hfj_tag_def") || if (DaoFailureUtil.isTagStorageFailure(e)) {
e.getMessage().contains("HFJ_RES_TAG") || e.getMessage().contains("hfj_res_tag")) {
maxRetries = 3; maxRetries = 3;
} }

View File

@ -10,6 +10,7 @@ import ca.uhn.fhir.jpa.model.entity.ResourceIndexedSearchParamString;
import ca.uhn.fhir.jpa.model.entity.ResourceTable; import ca.uhn.fhir.jpa.model.entity.ResourceTable;
import ca.uhn.fhir.jpa.model.entity.ResourceTag; import ca.uhn.fhir.jpa.model.entity.ResourceTag;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum; import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import ca.uhn.fhir.jpa.partition.SystemRequestDetails;
import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test; import ca.uhn.fhir.jpa.provider.SystemProviderDstu2Test;
import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum; import ca.uhn.fhir.model.api.ResourceMetadataKeyEnum;

View File

@ -250,6 +250,45 @@ public class ResourceProviderR4BundleTest extends BaseResourceProviderR4Test {
} }
@Test
public void testTagCacheWorksWithBatchMode() {
Bundle input = new Bundle();
input.setType(BundleType.BATCH);
Patient p = new Patient();
p.setId("100");
p.setGender(AdministrativeGender.MALE);
p.addIdentifier().setSystem("urn:foo").setValue("A");
p.addName().setFamily("Smith");
p.getMeta().addTag().setSystem("mysystem").setCode("mycode");
input.addEntry().setResource(p).getRequest().setMethod(HTTPVerb.POST);
Patient p2 = new Patient();
p2.setId("200");
p2.setGender(AdministrativeGender.MALE);
p2.addIdentifier().setSystem("urn:foo").setValue("A");
p2.addName().setFamily("Smith");
p2.getMeta().addTag().setSystem("mysystem").setCode("mycode");
input.addEntry().setResource(p).getRequest().setMethod(HTTPVerb.POST);
Patient p3 = new Patient();
p3.setId("pat-300");
p3.setGender(AdministrativeGender.MALE);
p3.addIdentifier().setSystem("urn:foo").setValue("A");
p3.addName().setFamily("Smith");
p3.getMeta().addTag().setSystem("mysystem").setCode("mycode");
input.addEntry().setResource(p).getRequest().setMethod(HTTPVerb.PUT).setUrl("Patient/pat-300");
Bundle output = myClient.transaction().withBundle(input).execute();
output.getEntry().stream()
.map(BundleEntryComponent::getResponse)
.map(Bundle.BundleEntryResponseComponent::getStatus)
.forEach(statusCode -> {
assertEquals(statusCode, "201 Created");
});
}
private List<String> createPatients(int count) { private List<String> createPatients(int count) {
List<String> ids = new ArrayList<String>(); List<String> ids = new ArrayList<String>();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {