diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ConcurrentWriteTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ConcurrentWriteTest.java index 1e1bda39256..6e86e02e6b1 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ConcurrentWriteTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/dao/r4/FhirResourceDaoR4ConcurrentWriteTest.java @@ -167,6 +167,82 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test { } + /** + * Make a transaction with conditional updates that will fail due to + * constraint errors and be retried automatically. Make sure that the + * retry succeeds and that the data ultimately gets written. + */ + @Test + public void testConcurrentTransactionConditionalUpdates() throws ExecutionException, InterruptedException { + myInterceptorRegistry.registerInterceptor(myRetryInterceptor); + + Patient pt = new Patient(); + pt.setId("Patient/A"); + pt.addIdentifier().setSystem("http://foo").setValue("pt1"); + myPatientDao.update(pt); + + Observation obs = new Observation(); + obs.setId("Observation/O"); + obs.addIdentifier().setSystem("http://foo").setValue("obs1"); + myObservationDao.update(obs); + + AtomicInteger counter = new AtomicInteger(0); + Runnable creator = () -> { + BundleBuilder bb = new BundleBuilder(myFhirCtx); + + Patient patient = new Patient(); + patient.setId(IdType.newRandomUuid()); + patient.addIdentifier().setSystem("http://foo").setValue("pt1"); + patient.addName().setFamily("fam-" + counter.incrementAndGet()); + bb.addTransactionUpdateEntry(patient).conditional("Patient?identifier=http://foo|pt1"); + + Observation observation = new Observation(); + observation.setId(IdType.newRandomUuid()); + observation.addIdentifier().setSystem("http://foo").setValue("obs1"); + observation.getCode().setText("obs-" + counter.incrementAndGet()); + observation.getSubject().setReference(patient.getId()); + bb.addTransactionUpdateEntry(observation).conditional("Observation?identifier=http://foo|obs1"); + + Bundle input = (Bundle) bb.getBundle(); + input.getEntry().get(0).getResource().setId("Patient/A"); + input.getEntry().get(1).getResource().setId("Observation/O"); + SystemRequestDetails requestDetails = new SystemRequestDetails(); + UserRequestRetryVersionConflictsInterceptor.addRetryHeader(requestDetails, 20); + mySystemDao.transaction(requestDetails, input); + }; + + List> futures = new ArrayList<>(); + for (int j = 0; j < 10; j++) { + futures.add(myExecutor.submit(creator)); + } + + for (Future next : futures) { + next.get(); + } + + runInTransaction(() -> { + Map counts = new TreeMap<>(); + myResourceTableDao + .findAll() + .stream() + .forEach(t -> { + counts.putIfAbsent(t.getResourceType(), 0); + int value = counts.get(t.getResourceType()); + value++; + counts.put(t.getResourceType(), value); + }); + ourLog.info("Counts: {}", counts); + + assertEquals(1, counts.get("Patient"), counts.toString()); + assertEquals(1, counts.get("Observation"), counts.toString()); + logAllResourceLinks(); + assertEquals(2, myResourceLinkDao.count()); + assertEquals(22, myResourceHistoryTableDao.count()); + }); + + } + + @Test public void testCreateWithClientAssignedId() { myInterceptorRegistry.registerInterceptor(myRetryInterceptor);