Support retry of $reindex version conflict (#4603)

* Support retry of $reindex conflict
* Notes and cleanup of Phaser
This commit is contained in:
michaelabuckley 2023-02-28 12:53:06 -05:00 committed by GitHub
parent bf495e2d92
commit ec13b751fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 136 additions and 83 deletions

View File

@ -0,0 +1,5 @@
---
type: change
issue: 4603
title: "Transaction retry will now also apply to ObjectOptimisticLockingFailureException. This enables retry of
$reindex work chunks when they collide with a DELETE operation."

View File

@ -8,8 +8,11 @@ import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.SystemRequestDetails; import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import ca.uhn.fhir.storage.test.DaoTestDataBuilder; import ca.uhn.fhir.storage.test.DaoTestDataBuilder;
import ca.uhn.test.concurrency.LockstepEnumPhaser; import ca.uhn.test.concurrency.LockstepEnumPhaser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hamcrest.Matchers;
import org.hl7.fhir.instance.model.api.IIdType; import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Observation;
import org.hl7.fhir.r4.model.SearchParameter; import org.hl7.fhir.r4.model.SearchParameter;
@ -21,15 +24,15 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Propagation;
import javax.annotation.Nonnull;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ContextConfiguration(classes = { @ContextConfiguration(classes = {
@ -49,12 +52,12 @@ class ReindexRaceBugTest extends BaseJpaR4Test {
* The $reindex step processes several resources in a single tx. * The $reindex step processes several resources in a single tx.
* The tested sequence here is: job step $reindexes a resouce, then another thread DELETEs the resource, * The tested sequence here is: job step $reindexes a resouce, then another thread DELETEs the resource,
* then later, the $reindex step finishes the rest of the resources and commits AFTER the DELETE commits. * then later, the $reindex step finishes the rest of the resources and commits AFTER the DELETE commits.
* * This scenario could insert index rows into HFJ_SPIDX_TOKEN even though the resource was gone.
* This was inserting new index rows into HFJ_SPIDX_TOKEN even though the resource was gone.
* This is an illegal state for our index. Deleted resources should never have content in HFJ_SPIDX_* * This is an illegal state for our index. Deleted resources should never have content in HFJ_SPIDX_*
* Fixed by taking an optimistic lock on hfj_resource even though $reindex is read-only on that table.
*/ */
@Test @Test
void deleteOverlapsWithReindex_leavesIndexRowsP() throws InterruptedException, ExecutionException { void deleteOverlapsWithReindex_leavesIndexRowsP() {
LockstepEnumPhaser<Steps> phaser = new LockstepEnumPhaser<>(2, Steps.class); LockstepEnumPhaser<Steps> phaser = new LockstepEnumPhaser<>(2, Steps.class);
ourLog.info("An observation is created"); ourLog.info("An observation is created");
@ -90,10 +93,8 @@ class ReindexRaceBugTest extends BaseJpaR4Test {
assertEquals(1, getSPIDXDateCount(observationPid), "still only one index row before reindex"); assertEquals(1, getSPIDXDateCount(observationPid), "still only one index row before reindex");
// suppose reindex job step starts here and loads the resource and ResourceTable entity // suppose reindex job step starts here and loads the resource and ResourceTable entity
ThreadFactory loggingThreadFactory = getLoggingThreadFactory("Reindex-thread"); ExecutorService backgroundReindexThread = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder().namingPattern("Reindex-thread-%d").build());
ExecutorService backgroundReindexThread = Executors.newSingleThreadExecutor(loggingThreadFactory);
Future<Integer> backgroundResult = backgroundReindexThread.submit(() -> { Future<Integer> backgroundResult = backgroundReindexThread.submit(() -> {
try { try {
callInFreshTx((tx, rd) -> { callInFreshTx((tx, rd) -> {
@ -120,6 +121,7 @@ class ReindexRaceBugTest extends BaseJpaR4Test {
return 0; return 0;
}); });
} finally { } finally {
ourLog.info("$reindex commit complete");
phaser.arriveAndAwaitSharedEndOf(Steps.COMMIT_REINDEX); phaser.arriveAndAwaitSharedEndOf(Steps.COMMIT_REINDEX);
} }
return 1; return 1;
@ -131,9 +133,8 @@ class ReindexRaceBugTest extends BaseJpaR4Test {
phaser.arriveAndAwaitSharedEndOf(Steps.RUN_REINDEX); phaser.arriveAndAwaitSharedEndOf(Steps.RUN_REINDEX);
// then the resource is deleted // then the resource is deleted
phaser.assertInPhase(Steps.RUN_DELETE);
ourLog.info("Deleting observation"); ourLog.info("Deleting observation");
phaser.assertInPhase(Steps.RUN_DELETE);
callInFreshTx((tx, rd) -> myObservationDao.delete(observationId, rd)); callInFreshTx((tx, rd) -> myObservationDao.delete(observationId, rd));
assertResourceDeleted(observationId); assertResourceDeleted(observationId);
assertEquals(0, getSPIDXDateCount(observationPid), "A deleted resource should have 0 index rows"); assertEquals(0, getSPIDXDateCount(observationPid), "A deleted resource should have 0 index rows");
@ -146,22 +147,12 @@ class ReindexRaceBugTest extends BaseJpaR4Test {
phaser.arriveAndAwaitSharedEndOf(Steps.COMMIT_REINDEX); phaser.arriveAndAwaitSharedEndOf(Steps.COMMIT_REINDEX);
assertEquals(0, getSPIDXDateCount(observationPid), "A deleted resource should still have 0 index rows, after $reindex completes"); assertEquals(0, getSPIDXDateCount(observationPid), "A deleted resource should still have 0 index rows, after $reindex completes");
}
@Nonnull // Verify the exception from $reindex
static ThreadFactory getLoggingThreadFactory(String theThreadName) { // In a running server, we expect UserRequestRetryVersionConflictsInterceptor to cause a retry inside the ReindexStep
ThreadFactory loggingThreadFactory = r -> new Thread(() -> { // But here in the test, we have not configured any retry logic.
boolean success = false; ExecutionException e = assertThrows(ExecutionException.class, backgroundResult::get, "Optimistic locking detects the DELETE and rolls back");
try { assertThat("Hapi maps conflict exception type", e.getCause(), Matchers.instanceOf(ResourceVersionConflictException.class));
r.run();
success = true;
} finally {
if (!success) {
ourLog.error("Background thread failed");
}
}
}, theThreadName);
return loggingThreadFactory;
} }
void assertResourceDeleted(IIdType observationId) { void assertResourceDeleted(IIdType observationId) {

View File

@ -45,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataIntegrityViolationException; import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Isolation;
@ -222,7 +223,7 @@ public class HapiTransactionService implements IHapiTransactionService {
return doExecuteCallback(theExecutionBuilder, theCallback); return doExecuteCallback(theExecutionBuilder, theCallback);
} catch (ResourceVersionConflictException | DataIntegrityViolationException e) { } catch (ResourceVersionConflictException | DataIntegrityViolationException | ObjectOptimisticLockingFailureException e) {
ourLog.debug("Version conflict detected", e); ourLog.debug("Version conflict detected", e);
if (theExecutionBuilder.myOnRollback != null) { if (theExecutionBuilder.myOnRollback != null) {

View File

@ -9,17 +9,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
/** /**
* Test helper to force a particular sequence on 2 or more threads. * Test helper to force a particular sequence on multiple threads.
* Wraps Phaser with an Enum for better messages, and some test support. * Wraps Phaser with an Enum for better messages, and some test support.
* The only use is to impose a particular execution sequence over multiple threads when reproducing bugs. * The only use is to impose a particular execution sequence over multiple threads when reproducing bugs.
* * <p>
* The simplest usage is to declare the number of collaborators as theParticipantCount * The simplest usage is to declare the number of collaborators as theParticipantCount
* in the constructor, and then have each participant thread call {@link #arriveAndAwaitSharedEndOf} * in the constructor, and then have each participant thread call {@link #arriveAndAwaitSharedEndOf}
* as they finish the work of every phase. * as they finish the work of every phase.
* Every thread needs to confirm, even if they do no work in that phase. * Every thread needs to confirm, even if they do no work in that phase.
* <p> * <p>
* Note: this is just a half-baked wrapper around Phaser. * Note: this is just a half-baked wrapper around Phaser.
* The behaviour is not especially precise, or tested. Comments welcome: MB.
* *
* @param <E> an enum used to name the phases. * @param <E> an enum used to name the phases.
*/ */
@ -35,60 +34,72 @@ public class LockstepEnumPhaser<E extends Enum<E>> {
myEnumConstants = myEnumClass.getEnumConstants(); myEnumConstants = myEnumClass.getEnumConstants();
} }
public E arrive() {
E result = phaseToEnum(myPhaser.arrive());
ourLog.info("Arrive in phase {}", result);
return result;
}
public void assertInPhase(E theStageEnum) { public void assertInPhase(E theStageEnum) {
assertEquals(theStageEnum, getPhase(), "In stage " + theStageEnum); assertEquals(theStageEnum, getPhase(), "In stage " + theStageEnum);
} }
/**
* Get the current phase.
*/
public E getPhase() { public E getPhase() {
return phaseToEnum(myPhaser.getPhase()); return phaseToEnum(myPhaser.getPhase());
} }
public E awaitAdvance(E thePhase) { /**
* Declare that this thread-participant has finished the work of thePhase,
* and then wait until all other participants have also finished.
*
* @param thePhase the phase the thread just completed
* @return the new phase starting.
*/
public E arriveAndAwaitSharedEndOf(E thePhase) {
checkAwait(thePhase); checkAwait(thePhase);
E current = arrive();
assertEquals(thePhase, current);
return doAwait(thePhase); return doAwait(thePhase);
} }
/** /**
* Like arrive(), but verify stage first * Finish a phase, and deregister so that later stages can complete
* with a reduced participant count.
*/ */
public E arriveAtMyEndOf(E thePhase) {
assertInPhase(thePhase);
return arrive();
}
public E arriveAndAwaitSharedEndOf(E thePhase) {
checkAwait(thePhase);
arrive();
return doAwait(thePhase);
}
public E arriveAndDeregister() { public E arriveAndDeregister() {
return phaseToEnum(myPhaser.arriveAndDeregister()); return phaseToEnum(myPhaser.arriveAndDeregister());
} }
/**
* Add a new participant to the pool.
* Later await calls will wait for one more arrival before proceeding.
*/
public E register() { public E register() {
return phaseToEnum(myPhaser.register()); return phaseToEnum(myPhaser.register());
} }
E arrive() {
E result = phaseToEnum(myPhaser.arrive());
ourLog.info("Arrive to my end of phase {}", result);
return result;
}
private E doAwait(E thePhase) { private E doAwait(E thePhase) {
ourLog.debug("Start doAwait - {}", thePhase); ourLog.debug("Start doAwait - {}", thePhase);
E phase = phaseToEnum(myPhaser.awaitAdvance(thePhase.ordinal())); E phase = phaseToEnum(myPhaser.awaitAdvance(thePhase.ordinal()));
ourLog.info("Finish doAwait - {}", thePhase); ourLog.info("Done waiting for end of {}. Now starting {}", thePhase, getPhase());
return phase; return phase;
} }
/**
* Defensively verify that the phase we are waiting to end is actually the current phase.
*/
private void checkAwait(E thePhase) { private void checkAwait(E thePhase) {
E currentPhase = getPhase(); E currentPhase = getPhase();
if (currentPhase.ordinal() < thePhase.ordinal()) { if (currentPhase.ordinal() < thePhase.ordinal()) {
fail("Can't wait for end of phase " + thePhase + ", still in phase " + currentPhase); // Explicitly progressing lock-step is safer for most tests.
// But we could allow waiting multiple phases with a loop here instead of failing. MB
fail(String.format("Can't wait for end of phase %s, still in phase %s", thePhase, currentPhase));
} else if (currentPhase.ordinal() > thePhase.ordinal()) { } else if (currentPhase.ordinal() > thePhase.ordinal()) {
ourLog.warn("Skip waiting for phase {}, already in phase {}", thePhase, currentPhase); fail(String.format("Can't wait for end of phase %s, already in phase %s", thePhase, currentPhase));
} }
} }

View File

@ -8,6 +8,7 @@ import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -26,6 +27,9 @@ import static ca.uhn.test.concurrency.LockstepEnumPhaserTest.Stages.TWO;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
// All of these should run pretty quickly - 5s should be lots.
// But if they deadlock, they will hang forever. Need @Timeout.
@Timeout(5)
class LockstepEnumPhaserTest { class LockstepEnumPhaserTest {
private static final Logger ourLog = LoggerFactory.getLogger(LockstepEnumPhaserTest.class); private static final Logger ourLog = LoggerFactory.getLogger(LockstepEnumPhaserTest.class);
final ExecutorService myExecutorService = Executors.newFixedThreadPool(10); final ExecutorService myExecutorService = Executors.newFixedThreadPool(10);
@ -39,7 +43,6 @@ class LockstepEnumPhaserTest {
LockstepEnumPhaser<Stages> myPhaser; LockstepEnumPhaser<Stages> myPhaser;
@Timeout(1)
@Test @Test
void phaserWithOnePariticpant_worksFine() { void phaserWithOnePariticpant_worksFine() {
// given // given
@ -47,7 +50,7 @@ class LockstepEnumPhaserTest {
myPhaser.assertInPhase(ONE); myPhaser.assertInPhase(ONE);
myPhaser.arriveAtMyEndOf(ONE); myPhaser.arriveAndAwaitSharedEndOf(ONE);
myPhaser.arriveAndAwaitSharedEndOf(TWO); myPhaser.arriveAndAwaitSharedEndOf(TWO);
@ -56,7 +59,6 @@ class LockstepEnumPhaserTest {
myPhaser.assertInPhase(FINISHED); myPhaser.assertInPhase(FINISHED);
} }
@Timeout(5)
@Test @Test
void phaserWithTwoThreads_runsInLockStep() throws InterruptedException, ExecutionException { void phaserWithTwoThreads_runsInLockStep() throws InterruptedException, ExecutionException {
// given // given
@ -82,10 +84,7 @@ class LockstepEnumPhaserTest {
myPhaser.assertInPhase(THREE); myPhaser.assertInPhase(THREE);
recordProgress(threadId); recordProgress(threadId);
Stages nextStage = myPhaser.awaitAdvance(TWO); myPhaser.arriveAndAwaitSharedEndOf(THREE);
assertEquals(THREE, nextStage);
myPhaser.arriveAtMyEndOf(THREE);
ourLog.info("Finished"); ourLog.info("Finished");
@ -104,33 +103,13 @@ class LockstepEnumPhaserTest {
myProgressEvents.add(Pair.of(threadId, myPhaser.getPhase())); myProgressEvents.add(Pair.of(threadId, myPhaser.getPhase()));
} }
@Timeout(5)
@Test @Test
void phaserWithTwoThreads_canAddThird_sequencContinues() throws InterruptedException, ExecutionException { void phaserWithTwoThreads_canAddThird_sequencContinues() throws InterruptedException, ExecutionException {
// given // given
myPhaser = new LockstepEnumPhaser<>(2, Stages.class); myPhaser = new LockstepEnumPhaser<>(2, Stages.class);
// run one simple schedule // run one simple schedule
Callable<Integer> schedule1 = ()->{ Callable<Integer> schedule1 = buildSimpleCountingSchedule(1);
int threadId = 1;
ourLog.info("Starting schedule1");
myPhaser.assertInPhase(ONE);
recordProgress(threadId);
myPhaser.arriveAndAwaitSharedEndOf(ONE);
recordProgress(threadId);
myPhaser.arriveAndAwaitSharedEndOf(TWO);
recordProgress(threadId);
myPhaser.arriveAndAwaitSharedEndOf(THREE);
ourLog.info("Finished schedule1");
return 1;
};
// this schedule will start half-way in // this schedule will start half-way in
Callable<Integer> schedule2 = ()->{ Callable<Integer> schedule2 = ()->{
int threadId = 2; int threadId = 2;
@ -149,7 +128,7 @@ class LockstepEnumPhaserTest {
ourLog.info("Finished schedule2"); ourLog.info("Finished schedule2");
return 1; return 2;
}; };
// this schedule will start schedule 2 half-way // this schedule will start schedule 2 half-way
Callable<Integer> schedule3 = ()->{ Callable<Integer> schedule3 = ()->{
@ -174,17 +153,83 @@ class LockstepEnumPhaserTest {
ourLog.info("Finished schedule3"); ourLog.info("Finished schedule3");
return 1; return 3;
}; };
Future<Integer> result1 = myExecutorService.submit(schedule1); Future<Integer> result1 = myExecutorService.submit(schedule1);
Future<Integer> result2 = myExecutorService.submit(schedule3); Future<Integer> result3 = myExecutorService.submit(schedule3);
assertEquals(1, result1.get()); assertEquals(1, result1.get());
assertEquals(1, result2.get()); assertEquals(3, result3.get());
assertThat("progress is ordered", myProgressEvents, OrderMatchers.softOrdered(myProgressStageComparator)); assertThat("progress is ordered", myProgressEvents, OrderMatchers.softOrdered(myProgressStageComparator));
assertThat("all progress logged", myProgressEvents, Matchers.hasSize(8)); assertThat("all progress logged", myProgressEvents, Matchers.hasSize(8));
} }
@Nonnull
private Callable<Integer> buildSimpleCountingSchedule(int theThreadId) {
Callable<Integer> schedule = ()->{
ourLog.info("Starting schedule - {}", theThreadId);
myPhaser.assertInPhase(ONE);
recordProgress(theThreadId);
myPhaser.arriveAndAwaitSharedEndOf(ONE);
recordProgress(theThreadId);
myPhaser.arriveAndAwaitSharedEndOf(TWO);
recordProgress(theThreadId);
myPhaser.arriveAndAwaitSharedEndOf(THREE);
ourLog.info("Finished schedule1");
return theThreadId;
};
return schedule;
}
@Test
void aShortScheduleDeregister_allowsRemainingParticipantsToContinue() throws ExecutionException, InterruptedException {
// given
myPhaser = new LockstepEnumPhaser<>(3, Stages.class);
// Three schedules, but with one that leaves early
// sched 1,2 counting
// sched 3 start, but end with 2.
Callable<Integer> schedule1 = buildSimpleCountingSchedule(1);
Callable<Integer> schedule2 = buildSimpleCountingSchedule(2);
Callable<Integer> schedule3 = () -> {
int threadId = 3;
ourLog.info("Starting schedule - {}", threadId);
myPhaser.assertInPhase(ONE);
recordProgress(threadId);
myPhaser.arriveAndAwaitSharedEndOf(ONE);
recordProgress(threadId);
ourLog.info("Leaving schedule - {}", threadId);
Stages deregisterPhase = myPhaser.arriveAndDeregister();
assertEquals(TWO, deregisterPhase);
return threadId;
};
Future<Integer> result1 = myExecutorService.submit(schedule1);
Future<Integer> result2 = myExecutorService.submit(schedule2);
Future<Integer> result3 = myExecutorService.submit(schedule3);
assertEquals(1, result1.get());
assertEquals(2, result2.get());
assertEquals(3, result3.get());
assertThat("progress is ordered", myProgressEvents, OrderMatchers.softOrdered(myProgressStageComparator));
assertThat("all progress logged", myProgressEvents, Matchers.hasSize(2*3 + 2));
}
} }