From cedf69516b5ece527b8a6c941cacd4b7ac1e0857 Mon Sep 17 00:00:00 2001 From: michaelabuckley Date: Sun, 26 Feb 2023 22:12:12 -0500 Subject: [PATCH] Simultaneous $reindex and DELETE can orphan index rows (#4584) Add optimistic write lock to $reindex operation to prevent indexing stale data. Introduce Lockstep Phaser to make testing easier Add 1 query to every resource reindex :-( --- .../6_6_0/4584-reindex-orphan-index-rows.yaml | 4 + .../fhir/jpa/dao/BaseHapiFhirResourceDao.java | 13 +- .../extractor/BaseSearchParamExtractor.java | 1 + .../uhn/fhir/jpa/reindex/ReindexStepTest.java | 10 +- .../search/reindex/ReindexRaceBugTest.java | 192 ++++++++++++++++++ .../ca/uhn/fhir/jpa/test/BaseJpaR4Test.java | 1 + hapi-fhir-test-utilities/pom.xml | 14 +- .../test/concurrency/LockstepEnumPhaser.java | 104 ++++++++++ .../concurrency/LockstepEnumPhaserTest.java | 190 +++++++++++++++++ 9 files changed, 517 insertions(+), 12 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_6_0/4584-reindex-orphan-index-rows.yaml create mode 100644 hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/search/reindex/ReindexRaceBugTest.java create mode 100644 hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/LockstepEnumPhaser.java create mode 100644 hapi-fhir-test-utilities/src/test/java/ca/uhn/test/concurrency/LockstepEnumPhaserTest.java diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_6_0/4584-reindex-orphan-index-rows.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_6_0/4584-reindex-orphan-index-rows.yaml new file mode 100644 index 00000000000..2f23b4319a5 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/6_6_0/4584-reindex-orphan-index-rows.yaml @@ -0,0 +1,4 @@ +--- +type: fix +issue: 4548 +title: "Simultaneous DELETE and $reindex operations could corrupt the search index. This has been fixed." diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java index 3d6377dc85e..0642f9f2158 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/dao/BaseHapiFhirResourceDao.java @@ -133,6 +133,7 @@ import org.springframework.transaction.support.TransactionTemplate; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.PostConstruct; +import javax.persistence.LockModeType; import javax.persistence.NoResultException; import javax.persistence.TypedQuery; import javax.servlet.http.HttpServletResponse; @@ -1222,13 +1223,19 @@ public abstract class BaseHapiFhirResourceDao extends B @Override public void reindex(IResourcePersistentId thePid, RequestDetails theRequest, TransactionDetails theTransactionDetails) { JpaPid jpaPid = (JpaPid) thePid; - Optional entityOpt = myResourceTableDao.findById(jpaPid.getId()); - if (!entityOpt.isPresent()) { + + // Careful! Reindex only reads ResourceTable, but we tell Hibernate to check version + // to ensure Hibernate will catch concurrent updates (PUT/DELETE) elsewhere. + // Otherwise, we may index stale data. See #4584 + // We use the main entity as the lock object since all the index rows hang off it. + ResourceTable entity = + myEntityManager.find(ResourceTable.class, jpaPid.getId(), LockModeType.OPTIMISTIC); + + if (entity == null) { ourLog.warn("Unable to find entity with PID: {}", jpaPid.getId()); return; } - ResourceTable entity = entityOpt.get(); try { T resource = (T) myJpaStorageResourceParser.toResource(entity, false); reindex(resource, entity); diff --git a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/extractor/BaseSearchParamExtractor.java b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/extractor/BaseSearchParamExtractor.java index e11d1b787d9..c3f3593cbde 100644 --- a/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/extractor/BaseSearchParamExtractor.java +++ b/hapi-fhir-jpaserver-searchparam/src/main/java/ca/uhn/fhir/jpa/searchparam/extractor/BaseSearchParamExtractor.java @@ -1945,6 +1945,7 @@ public abstract class BaseSearchParamExtractor implements ISearchParamExtractor IPrimitiveType nextBaseDateTime = (IPrimitiveType) theValue; if (nextBaseDateTime.getValue() != null) { myIndexedSearchParamDate = new ResourceIndexedSearchParamDate(myPartitionSettings, theResourceType, theSearchParam.getName(), nextBaseDateTime.getValue(), nextBaseDateTime.getValueAsString(), nextBaseDateTime.getValue(), nextBaseDateTime.getValueAsString(), nextBaseDateTime.getValueAsString()); + ourLog.trace("DateExtractor - extracted {} for {}", nextBaseDateTime, theSearchParam.getName()); theParams.add(myIndexedSearchParamDate); } } diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java index 5a23c780ba4..fb4091a8279 100644 --- a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/reindex/ReindexStepTest.java @@ -61,7 +61,7 @@ public class ReindexStepTest extends BaseJpaR4Test { // Verify assertEquals(2, outcome.getRecordsProcessed()); - assertEquals(4, myCaptureQueriesListener.logSelectQueries().size()); + assertEquals(6, myCaptureQueriesListener.logSelectQueries().size()); assertEquals(0, myCaptureQueriesListener.countInsertQueries()); assertEquals(0, myCaptureQueriesListener.countUpdateQueries()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); @@ -91,7 +91,7 @@ public class ReindexStepTest extends BaseJpaR4Test { // Verify assertEquals(2, outcome.getRecordsProcessed()); - assertEquals(6, myCaptureQueriesListener.logSelectQueries().size()); + assertEquals(8, myCaptureQueriesListener.logSelectQueries().size()); assertEquals(0, myCaptureQueriesListener.countInsertQueries()); assertEquals(0, myCaptureQueriesListener.countUpdateQueries()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); @@ -124,7 +124,7 @@ public class ReindexStepTest extends BaseJpaR4Test { // Verify assertEquals(2, outcome.getRecordsProcessed()); - assertEquals(4, myCaptureQueriesListener.logSelectQueries().size()); + assertEquals(6, myCaptureQueriesListener.logSelectQueries().size()); // name, family, phonetic, deceased, active assertEquals(5, myCaptureQueriesListener.countInsertQueries()); assertEquals(0, myCaptureQueriesListener.countUpdateQueries()); @@ -192,7 +192,7 @@ public class ReindexStepTest extends BaseJpaR4Test { // Verify assertEquals(2, outcome.getRecordsProcessed()); - assertEquals(8, myCaptureQueriesListener.logSelectQueries().size()); + assertEquals(10, myCaptureQueriesListener.logSelectQueries().size()); assertEquals(0, myCaptureQueriesListener.countInsertQueries()); assertEquals(4, myCaptureQueriesListener.countUpdateQueries()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); @@ -237,7 +237,7 @@ public class ReindexStepTest extends BaseJpaR4Test { // Verify assertEquals(4, outcome.getRecordsProcessed()); - assertEquals(5, myCaptureQueriesListener.logSelectQueries().size()); + assertEquals(9, myCaptureQueriesListener.logSelectQueries().size()); assertEquals(5, myCaptureQueriesListener.countInsertQueries()); assertEquals(2, myCaptureQueriesListener.countUpdateQueries()); assertEquals(0, myCaptureQueriesListener.countDeleteQueries()); diff --git a/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/search/reindex/ReindexRaceBugTest.java b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/search/reindex/ReindexRaceBugTest.java new file mode 100644 index 00000000000..ca920975a30 --- /dev/null +++ b/hapi-fhir-jpaserver-test-r4/src/test/java/ca/uhn/fhir/jpa/search/reindex/ReindexRaceBugTest.java @@ -0,0 +1,192 @@ +package ca.uhn.fhir.jpa.search.reindex; + +import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome; +import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; +import ca.uhn.fhir.jpa.model.dao.JpaPid; +import ca.uhn.fhir.jpa.test.BaseJpaR4Test; +import ca.uhn.fhir.rest.api.server.RequestDetails; +import ca.uhn.fhir.rest.api.server.SystemRequestDetails; +import ca.uhn.fhir.rest.api.server.storage.TransactionDetails; +import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException; +import ca.uhn.fhir.storage.test.DaoTestDataBuilder; +import ca.uhn.test.concurrency.LockstepEnumPhaser; +import org.hl7.fhir.instance.model.api.IIdType; +import org.hl7.fhir.r4.model.Observation; +import org.hl7.fhir.r4.model.SearchParameter; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Propagation; + +import javax.annotation.Nonnull; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.function.BiFunction; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@ContextConfiguration(classes = { + DaoTestDataBuilder.Config.class +}) +class ReindexRaceBugTest extends BaseJpaR4Test { + private static final Logger ourLog = LoggerFactory.getLogger(ReindexRaceBugTest.class); + @Autowired + HapiTransactionService myHapiTransactionService; + + enum Steps { + STARTING, RUN_REINDEX, RUN_DELETE, COMMIT_REINDEX, FINISHED + } + + /** + * Simulate reindex job step overlapping with resource deletion. + * The $reindex step processes several resources in a single tx. + * The tested sequence here is: job step $reindexes a resouce, then another thread DELETEs the resource, + * then later, the $reindex step finishes the rest of the resources and commits AFTER the DELETE commits. + * + * This was inserting new index rows into HFJ_SPIDX_TOKEN even though the resource was gone. + * This is an illegal state for our index. Deleted resources should never have content in HFJ_SPIDX_* + */ + @Test + void deleteOverlapsWithReindex_leavesIndexRowsP() throws InterruptedException, ExecutionException { + LockstepEnumPhaser phaser = new LockstepEnumPhaser<>(2, Steps.class); + + ourLog.info("An observation is created"); + + var observationCreateOutcome = callInFreshTx((tx, rd) -> { + Observation o = (Observation) buildResource("Observation", withEffectiveDate("2021-01-01T00:00:00")); + return myObservationDao.create(o, rd); + }); + IIdType observationId = observationCreateOutcome.getId().toVersionless(); + long observationPid = Long.parseLong(observationCreateOutcome.getId().getIdPart()); + + assertEquals(1, getSPIDXDateCount(observationPid), "date index row for date"); + + ourLog.info("Then a SP is created after that matches data in the Observation"); + SearchParameter sp = myFhirContext.newJsonParser().parseResource(SearchParameter.class, """ + { + "resourceType": "SearchParameter", + "id": "observation-date2", + "status": "active", + "code": "date2", + "name": "date2", + "base": [ "Observation" ], + "type": "date", + "expression": "Observation.effective" + } + """); + this.myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(false); + callInFreshTx((tx, rd) -> { + DaoMethodOutcome result = mySearchParameterDao.update(sp, rd); + mySearchParamRegistry.forceRefresh(); + return result; + }); + + assertEquals(1, getSPIDXDateCount(observationPid), "still only one index row before reindex"); + + + // suppose reindex job step starts here and loads the resource and ResourceTable entity + ThreadFactory loggingThreadFactory = getLoggingThreadFactory("Reindex-thread"); + ExecutorService backgroundReindexThread = Executors.newSingleThreadExecutor(loggingThreadFactory); + Future backgroundResult = backgroundReindexThread.submit(() -> { + try { + callInFreshTx((tx, rd) -> { + try { + ourLog.info("Starting background $reindex"); + phaser.arriveAndAwaitSharedEndOf(Steps.STARTING); + + phaser.assertInPhase(Steps.RUN_REINDEX); + ourLog.info("Run $reindex"); + myObservationDao.reindex(JpaPid.fromIdAndResourceType(observationPid, "Observation"), rd, new TransactionDetails()); + + ourLog.info("$reindex done release main thread to delete"); + phaser.arriveAndAwaitSharedEndOf(Steps.RUN_REINDEX); + + ourLog.info("Wait for delete to finish"); + phaser.arriveAndAwaitSharedEndOf(Steps.RUN_DELETE); + + phaser.assertInPhase(Steps.COMMIT_REINDEX); + ourLog.info("Commit $reindex now that delete is finished"); + // commit happens here at end of block + } catch (Exception e) { + ourLog.error("$reindex thread failed", e); + } + return 0; + }); + } finally { + phaser.arriveAndAwaitSharedEndOf(Steps.COMMIT_REINDEX); + } + return 1; + }); + + ourLog.info("Wait for $reindex to start"); + phaser.arriveAndAwaitSharedEndOf(Steps.STARTING); + + phaser.arriveAndAwaitSharedEndOf(Steps.RUN_REINDEX); + + // then the resource is deleted + phaser.assertInPhase(Steps.RUN_DELETE); + + ourLog.info("Deleting observation"); + callInFreshTx((tx, rd) -> myObservationDao.delete(observationId, rd)); + assertResourceDeleted(observationId); + assertEquals(0, getSPIDXDateCount(observationPid), "A deleted resource should have 0 index rows"); + + ourLog.info("Let $reindex commit"); + phaser.arriveAndAwaitSharedEndOf(Steps.RUN_DELETE); + + // then the reindex call finishes + ourLog.info("Await $reindex commit"); + phaser.arriveAndAwaitSharedEndOf(Steps.COMMIT_REINDEX); + + assertEquals(0, getSPIDXDateCount(observationPid), "A deleted resource should still have 0 index rows, after $reindex completes"); + } + + @Nonnull + static ThreadFactory getLoggingThreadFactory(String theThreadName) { + ThreadFactory loggingThreadFactory = r -> new Thread(() -> { + boolean success = false; + try { + r.run(); + success = true; + } finally { + if (!success) { + ourLog.error("Background thread failed"); + } + } + }, theThreadName); + return loggingThreadFactory; + } + + void assertResourceDeleted(IIdType observationId) { + try { + // confirm deleted + callInFreshTx((tx, rd)-> + myObservationDao.read(observationId, rd, false)); + fail("Read deleted resource"); + } catch (ResourceGoneException e) { + // expected + } + } + + T callInFreshTx(BiFunction theCallback) { + SystemRequestDetails requestDetails = new SystemRequestDetails(); + return myHapiTransactionService.withRequest(requestDetails) + .withTransactionDetails(new TransactionDetails()) + .withPropagation(Propagation.REQUIRES_NEW) + .execute(tx-> theCallback.apply(tx, requestDetails)); + } + + + int getSPIDXDateCount(long observationPid) { + return callInFreshTx((rd, tx) -> + myResourceIndexedSearchParamDateDao.findAllForResourceId(observationPid).size()); + } + +} diff --git a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java index 717fac3d358..5a8567dad00 100644 --- a/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java +++ b/hapi-fhir-jpaserver-test-utilities/src/main/java/ca/uhn/fhir/jpa/test/BaseJpaR4Test.java @@ -103,6 +103,7 @@ import ca.uhn.fhir.rest.api.Constants; import ca.uhn.fhir.rest.api.EncodingEnum; import ca.uhn.fhir.rest.server.BasePagingProvider; import ca.uhn.fhir.rest.server.provider.ResourceProviderFactory; +import ca.uhn.fhir.storage.test.DaoTestDataBuilder; import ca.uhn.fhir.test.utilities.ITestDataBuilder; import ca.uhn.fhir.util.UrlUtil; import ca.uhn.fhir.validation.FhirValidator; diff --git a/hapi-fhir-test-utilities/pom.xml b/hapi-fhir-test-utilities/pom.xml index 2cbf65827a3..ab366500783 100644 --- a/hapi-fhir-test-utilities/pom.xml +++ b/hapi-fhir-test-utilities/pom.xml @@ -182,10 +182,16 @@ - - org.mockito - mockito-core - + + org.mockito + mockito-core + + + com.github.seregamorph + hamcrest-more-matchers + 0.1 + test + diff --git a/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/LockstepEnumPhaser.java b/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/LockstepEnumPhaser.java new file mode 100644 index 00000000000..fb97fcef2fb --- /dev/null +++ b/hapi-fhir-test-utilities/src/main/java/ca/uhn/test/concurrency/LockstepEnumPhaser.java @@ -0,0 +1,104 @@ +package ca.uhn.test.concurrency; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Phaser; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Test helper to force a particular sequence on 2 or more threads. + * Wraps Phaser with an Enum for better messages, and some test support. + * The only use is to impose a particular execution sequence over multiple threads when reproducing bugs. + * + * The simplest usage is to declare the number of collaborators as theParticipantCount + * in the constructor, and then have each participant thread call {@link #arriveAndAwaitSharedEndOf} + * as they finish the work of every phase. + * Every thread needs to confirm, even if they do no work in that phase. + *

+ * Note: this is just a half-baked wrapper around Phaser. + * The behaviour is not especially precise, or tested. Comments welcome: MB. + * + * @param an enum used to name the phases. + */ +public class LockstepEnumPhaser> { + private static final Logger ourLog = LoggerFactory.getLogger(LockstepEnumPhaser.class); + final Phaser myPhaser; + final Class myEnumClass; + final E[] myEnumConstants; + + public LockstepEnumPhaser(int theParticipantCount, Class theEnumClass) { + myPhaser = new Phaser(theParticipantCount); + myEnumClass = theEnumClass; + myEnumConstants = myEnumClass.getEnumConstants(); + } + + public E arrive() { + E result = phaseToEnum(myPhaser.arrive()); + ourLog.info("Arrive in phase {}", result); + return result; + } + + public void assertInPhase(E theStageEnum) { + assertEquals(theStageEnum, getPhase(), "In stage " + theStageEnum); + } + + public E getPhase() { + return phaseToEnum(myPhaser.getPhase()); + } + + public E awaitAdvance(E thePhase) { + checkAwait(thePhase); + return doAwait(thePhase); + } + + /** + * Like arrive(), but verify stage first + */ + public E arriveAtMyEndOf(E thePhase) { + assertInPhase(thePhase); + return arrive(); + } + + public E arriveAndAwaitSharedEndOf(E thePhase) { + checkAwait(thePhase); + arrive(); + return doAwait(thePhase); + } + + public E arriveAndDeregister() { + return phaseToEnum(myPhaser.arriveAndDeregister()); + } + + public E register() { + return phaseToEnum(myPhaser.register()); + } + + private E doAwait(E thePhase) { + ourLog.debug("Start doAwait - {}", thePhase); + E phase = phaseToEnum(myPhaser.awaitAdvance(thePhase.ordinal())); + ourLog.info("Finish doAwait - {}", thePhase); + return phase; + } + + private void checkAwait(E thePhase) { + E currentPhase = getPhase(); + if (currentPhase.ordinal() < thePhase.ordinal()) { + fail("Can't wait for end of phase " + thePhase + ", still in phase " + currentPhase); + } else if (currentPhase.ordinal() > thePhase.ordinal()) { + ourLog.warn("Skip waiting for phase {}, already in phase {}", thePhase, currentPhase); + } + } + + + private E phaseToEnum(int resultOrdinal) { + if (resultOrdinal >= myEnumConstants.length) { + throw new IllegalStateException("Enum " + myEnumClass.getName() + " should declare one more enum value for post-completion reporting of phase " + resultOrdinal); + } + return myEnumConstants[resultOrdinal]; + } + + +} diff --git a/hapi-fhir-test-utilities/src/test/java/ca/uhn/test/concurrency/LockstepEnumPhaserTest.java b/hapi-fhir-test-utilities/src/test/java/ca/uhn/test/concurrency/LockstepEnumPhaserTest.java new file mode 100644 index 00000000000..8a4cafce706 --- /dev/null +++ b/hapi-fhir-test-utilities/src/test/java/ca/uhn/test/concurrency/LockstepEnumPhaserTest.java @@ -0,0 +1,190 @@ +package ca.uhn.test.concurrency; + +import com.github.seregamorph.hamcrest.OrderMatchers; +import org.apache.commons.lang3.tuple.Pair; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import static ca.uhn.test.concurrency.LockstepEnumPhaserTest.Stages.FINISHED; +import static ca.uhn.test.concurrency.LockstepEnumPhaserTest.Stages.ONE; +import static ca.uhn.test.concurrency.LockstepEnumPhaserTest.Stages.THREE; +import static ca.uhn.test.concurrency.LockstepEnumPhaserTest.Stages.TWO; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class LockstepEnumPhaserTest { + private static final Logger ourLog = LoggerFactory.getLogger(LockstepEnumPhaserTest.class); + final ExecutorService myExecutorService = Executors.newFixedThreadPool(10); + final List> myProgressEvents = Collections.synchronizedList(new ArrayList<>()); + /** Compare progress records by stage */ + final Comparator> myProgressStageComparator = Comparator.comparing(Pair::getRight); + + enum Stages { + ONE, TWO, THREE, FINISHED + } + + LockstepEnumPhaser myPhaser; + + @Timeout(1) + @Test + void phaserWithOnePariticpant_worksFine() { + // given + myPhaser = new LockstepEnumPhaser<>(1, Stages.class); + + myPhaser.assertInPhase(ONE); + + myPhaser.arriveAtMyEndOf(ONE); + + myPhaser.arriveAndAwaitSharedEndOf(TWO); + + myPhaser.arriveAndAwaitSharedEndOf(THREE); + + myPhaser.assertInPhase(FINISHED); + } + + @Timeout(5) + @Test + void phaserWithTwoThreads_runsInLockStep() throws InterruptedException, ExecutionException { + // given + myPhaser = new LockstepEnumPhaser<>(2, Stages.class); + + // run two copies of the same schedule + AtomicInteger i = new AtomicInteger(0); + Callable schedule = ()->{ + // get unique ids for each thread. + int threadId = i.getAndIncrement(); + + myPhaser.assertInPhase(ONE); + ourLog.info("Starting"); + recordProgress(threadId); + + myPhaser.arriveAndAwaitSharedEndOf(ONE); + + myPhaser.assertInPhase(TWO); + recordProgress(threadId); + + myPhaser.arriveAndAwaitSharedEndOf(TWO); + + myPhaser.assertInPhase(THREE); + recordProgress(threadId); + + Stages nextStage = myPhaser.awaitAdvance(TWO); + assertEquals(THREE, nextStage); + + myPhaser.arriveAtMyEndOf(THREE); + + ourLog.info("Finished"); + + return 1; + }; + Future result1 = myExecutorService.submit(schedule); + Future result2 = myExecutorService.submit(schedule); + + assertEquals(1, result1.get()); + assertEquals(1, result2.get()); + assertThat("progress is ordered", myProgressEvents, OrderMatchers.softOrdered(myProgressStageComparator)); + assertThat("all progress logged", myProgressEvents, Matchers.hasSize(6)); + } + + private void recordProgress(int threadId) { + myProgressEvents.add(Pair.of(threadId, myPhaser.getPhase())); + } + + @Timeout(5) + @Test + void phaserWithTwoThreads_canAddThird_sequencContinues() throws InterruptedException, ExecutionException { + // given + myPhaser = new LockstepEnumPhaser<>(2, Stages.class); + + // run one simple schedule + Callable schedule1 = ()->{ + int threadId = 1; + ourLog.info("Starting schedule1"); + myPhaser.assertInPhase(ONE); + recordProgress(threadId); + + myPhaser.arriveAndAwaitSharedEndOf(ONE); + + recordProgress(threadId); + + myPhaser.arriveAndAwaitSharedEndOf(TWO); + + recordProgress(threadId); + + myPhaser.arriveAndAwaitSharedEndOf(THREE); + + ourLog.info("Finished schedule1"); + + return 1; + }; + // this schedule will start half-way in + Callable schedule2 = ()->{ + int threadId = 2; + ourLog.info("Starting schedule2"); + + myPhaser.assertInPhase(TWO); + recordProgress(threadId); + + myPhaser.arriveAndAwaitSharedEndOf(TWO); + + myPhaser.assertInPhase(THREE); + + recordProgress(threadId); + + myPhaser.arriveAndAwaitSharedEndOf(THREE); + + ourLog.info("Finished schedule2"); + + return 1; + }; + // this schedule will start schedule 2 half-way + Callable schedule3 = ()->{ + int threadId = 3; + myPhaser.assertInPhase(ONE); + ourLog.info("Starting schedule3"); + recordProgress(threadId); + + myPhaser.arriveAndAwaitSharedEndOf(ONE); + + recordProgress(threadId); + + // add a new thread to the mix + myPhaser.register(); // tell the phaser to expect one more + myExecutorService.submit(schedule2); + + myPhaser.arriveAndAwaitSharedEndOf(TWO); + + recordProgress(threadId); + + myPhaser.arriveAndAwaitSharedEndOf(THREE); + + ourLog.info("Finished schedule3"); + + return 1; + }; + Future result1 = myExecutorService.submit(schedule1); + Future result2 = myExecutorService.submit(schedule3); + + assertEquals(1, result1.get()); + assertEquals(1, result2.get()); + + assertThat("progress is ordered", myProgressEvents, OrderMatchers.softOrdered(myProgressStageComparator)); + assertThat("all progress logged", myProgressEvents, Matchers.hasSize(8)); + + } + +}