Simultaneous $reindex and DELETE can orphan index rows (#4584)
Add optimistic write lock to $reindex operation to prevent indexing stale data. Introduce Lockstep Phaser to make testing easier Add 1 query to every resource reindex :-(
This commit is contained in:
parent
b25f364369
commit
cedf69516b
|
@ -0,0 +1,4 @@
|
||||||
|
---
|
||||||
|
type: fix
|
||||||
|
issue: 4548
|
||||||
|
title: "Simultaneous DELETE and $reindex operations could corrupt the search index. This has been fixed."
|
|
@ -133,6 +133,7 @@ import org.springframework.transaction.support.TransactionTemplate;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.persistence.LockModeType;
|
||||||
import javax.persistence.NoResultException;
|
import javax.persistence.NoResultException;
|
||||||
import javax.persistence.TypedQuery;
|
import javax.persistence.TypedQuery;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
@ -1222,13 +1223,19 @@ public abstract class BaseHapiFhirResourceDao<T extends IBaseResource> extends B
|
||||||
@Override
|
@Override
|
||||||
public void reindex(IResourcePersistentId thePid, RequestDetails theRequest, TransactionDetails theTransactionDetails) {
|
public void reindex(IResourcePersistentId thePid, RequestDetails theRequest, TransactionDetails theTransactionDetails) {
|
||||||
JpaPid jpaPid = (JpaPid) thePid;
|
JpaPid jpaPid = (JpaPid) thePid;
|
||||||
Optional<ResourceTable> entityOpt = myResourceTableDao.findById(jpaPid.getId());
|
|
||||||
if (!entityOpt.isPresent()) {
|
// Careful! Reindex only reads ResourceTable, but we tell Hibernate to check version
|
||||||
|
// to ensure Hibernate will catch concurrent updates (PUT/DELETE) elsewhere.
|
||||||
|
// Otherwise, we may index stale data. See #4584
|
||||||
|
// We use the main entity as the lock object since all the index rows hang off it.
|
||||||
|
ResourceTable entity =
|
||||||
|
myEntityManager.find(ResourceTable.class, jpaPid.getId(), LockModeType.OPTIMISTIC);
|
||||||
|
|
||||||
|
if (entity == null) {
|
||||||
ourLog.warn("Unable to find entity with PID: {}", jpaPid.getId());
|
ourLog.warn("Unable to find entity with PID: {}", jpaPid.getId());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ResourceTable entity = entityOpt.get();
|
|
||||||
try {
|
try {
|
||||||
T resource = (T) myJpaStorageResourceParser.toResource(entity, false);
|
T resource = (T) myJpaStorageResourceParser.toResource(entity, false);
|
||||||
reindex(resource, entity);
|
reindex(resource, entity);
|
||||||
|
|
|
@ -1945,6 +1945,7 @@ public abstract class BaseSearchParamExtractor implements ISearchParamExtractor
|
||||||
IPrimitiveType<Date> nextBaseDateTime = (IPrimitiveType<Date>) theValue;
|
IPrimitiveType<Date> nextBaseDateTime = (IPrimitiveType<Date>) theValue;
|
||||||
if (nextBaseDateTime.getValue() != null) {
|
if (nextBaseDateTime.getValue() != null) {
|
||||||
myIndexedSearchParamDate = new ResourceIndexedSearchParamDate(myPartitionSettings, theResourceType, theSearchParam.getName(), nextBaseDateTime.getValue(), nextBaseDateTime.getValueAsString(), nextBaseDateTime.getValue(), nextBaseDateTime.getValueAsString(), nextBaseDateTime.getValueAsString());
|
myIndexedSearchParamDate = new ResourceIndexedSearchParamDate(myPartitionSettings, theResourceType, theSearchParam.getName(), nextBaseDateTime.getValue(), nextBaseDateTime.getValueAsString(), nextBaseDateTime.getValue(), nextBaseDateTime.getValueAsString(), nextBaseDateTime.getValueAsString());
|
||||||
|
ourLog.trace("DateExtractor - extracted {} for {}", nextBaseDateTime, theSearchParam.getName());
|
||||||
theParams.add(myIndexedSearchParamDate);
|
theParams.add(myIndexedSearchParamDate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(2, outcome.getRecordsProcessed());
|
assertEquals(2, outcome.getRecordsProcessed());
|
||||||
assertEquals(4, myCaptureQueriesListener.logSelectQueries().size());
|
assertEquals(6, myCaptureQueriesListener.logSelectQueries().size());
|
||||||
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
|
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
|
||||||
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
|
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
|
||||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
||||||
|
@ -91,7 +91,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(2, outcome.getRecordsProcessed());
|
assertEquals(2, outcome.getRecordsProcessed());
|
||||||
assertEquals(6, myCaptureQueriesListener.logSelectQueries().size());
|
assertEquals(8, myCaptureQueriesListener.logSelectQueries().size());
|
||||||
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
|
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
|
||||||
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
|
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
|
||||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
||||||
|
@ -124,7 +124,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(2, outcome.getRecordsProcessed());
|
assertEquals(2, outcome.getRecordsProcessed());
|
||||||
assertEquals(4, myCaptureQueriesListener.logSelectQueries().size());
|
assertEquals(6, myCaptureQueriesListener.logSelectQueries().size());
|
||||||
// name, family, phonetic, deceased, active
|
// name, family, phonetic, deceased, active
|
||||||
assertEquals(5, myCaptureQueriesListener.countInsertQueries());
|
assertEquals(5, myCaptureQueriesListener.countInsertQueries());
|
||||||
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
|
assertEquals(0, myCaptureQueriesListener.countUpdateQueries());
|
||||||
|
@ -192,7 +192,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(2, outcome.getRecordsProcessed());
|
assertEquals(2, outcome.getRecordsProcessed());
|
||||||
assertEquals(8, myCaptureQueriesListener.logSelectQueries().size());
|
assertEquals(10, myCaptureQueriesListener.logSelectQueries().size());
|
||||||
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
|
assertEquals(0, myCaptureQueriesListener.countInsertQueries());
|
||||||
assertEquals(4, myCaptureQueriesListener.countUpdateQueries());
|
assertEquals(4, myCaptureQueriesListener.countUpdateQueries());
|
||||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
||||||
|
@ -237,7 +237,7 @@ public class ReindexStepTest extends BaseJpaR4Test {
|
||||||
|
|
||||||
// Verify
|
// Verify
|
||||||
assertEquals(4, outcome.getRecordsProcessed());
|
assertEquals(4, outcome.getRecordsProcessed());
|
||||||
assertEquals(5, myCaptureQueriesListener.logSelectQueries().size());
|
assertEquals(9, myCaptureQueriesListener.logSelectQueries().size());
|
||||||
assertEquals(5, myCaptureQueriesListener.countInsertQueries());
|
assertEquals(5, myCaptureQueriesListener.countInsertQueries());
|
||||||
assertEquals(2, myCaptureQueriesListener.countUpdateQueries());
|
assertEquals(2, myCaptureQueriesListener.countUpdateQueries());
|
||||||
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
assertEquals(0, myCaptureQueriesListener.countDeleteQueries());
|
||||||
|
|
|
@ -0,0 +1,192 @@
|
||||||
|
package ca.uhn.fhir.jpa.search.reindex;
|
||||||
|
|
||||||
|
import ca.uhn.fhir.jpa.api.model.DaoMethodOutcome;
|
||||||
|
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
|
||||||
|
import ca.uhn.fhir.jpa.model.dao.JpaPid;
|
||||||
|
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
|
||||||
|
import ca.uhn.fhir.rest.api.server.RequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
|
||||||
|
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
|
||||||
|
import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
|
||||||
|
import ca.uhn.fhir.storage.test.DaoTestDataBuilder;
|
||||||
|
import ca.uhn.test.concurrency.LockstepEnumPhaser;
|
||||||
|
import org.hl7.fhir.instance.model.api.IIdType;
|
||||||
|
import org.hl7.fhir.r4.model.Observation;
|
||||||
|
import org.hl7.fhir.r4.model.SearchParameter;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.transaction.TransactionStatus;
|
||||||
|
import org.springframework.transaction.annotation.Propagation;
|
||||||
|
|
||||||
|
import javax.annotation.Nonnull;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
|
@ContextConfiguration(classes = {
|
||||||
|
DaoTestDataBuilder.Config.class
|
||||||
|
})
|
||||||
|
class ReindexRaceBugTest extends BaseJpaR4Test {
|
||||||
|
private static final Logger ourLog = LoggerFactory.getLogger(ReindexRaceBugTest.class);
|
||||||
|
@Autowired
|
||||||
|
HapiTransactionService myHapiTransactionService;
|
||||||
|
|
||||||
|
enum Steps {
|
||||||
|
STARTING, RUN_REINDEX, RUN_DELETE, COMMIT_REINDEX, FINISHED
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate reindex job step overlapping with resource deletion.
|
||||||
|
* The $reindex step processes several resources in a single tx.
|
||||||
|
* The tested sequence here is: job step $reindexes a resouce, then another thread DELETEs the resource,
|
||||||
|
* then later, the $reindex step finishes the rest of the resources and commits AFTER the DELETE commits.
|
||||||
|
*
|
||||||
|
* This was inserting new index rows into HFJ_SPIDX_TOKEN even though the resource was gone.
|
||||||
|
* This is an illegal state for our index. Deleted resources should never have content in HFJ_SPIDX_*
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
void deleteOverlapsWithReindex_leavesIndexRowsP() throws InterruptedException, ExecutionException {
|
||||||
|
LockstepEnumPhaser<Steps> phaser = new LockstepEnumPhaser<>(2, Steps.class);
|
||||||
|
|
||||||
|
ourLog.info("An observation is created");
|
||||||
|
|
||||||
|
var observationCreateOutcome = callInFreshTx((tx, rd) -> {
|
||||||
|
Observation o = (Observation) buildResource("Observation", withEffectiveDate("2021-01-01T00:00:00"));
|
||||||
|
return myObservationDao.create(o, rd);
|
||||||
|
});
|
||||||
|
IIdType observationId = observationCreateOutcome.getId().toVersionless();
|
||||||
|
long observationPid = Long.parseLong(observationCreateOutcome.getId().getIdPart());
|
||||||
|
|
||||||
|
assertEquals(1, getSPIDXDateCount(observationPid), "date index row for date");
|
||||||
|
|
||||||
|
ourLog.info("Then a SP is created after that matches data in the Observation");
|
||||||
|
SearchParameter sp = myFhirContext.newJsonParser().parseResource(SearchParameter.class, """
|
||||||
|
{
|
||||||
|
"resourceType": "SearchParameter",
|
||||||
|
"id": "observation-date2",
|
||||||
|
"status": "active",
|
||||||
|
"code": "date2",
|
||||||
|
"name": "date2",
|
||||||
|
"base": [ "Observation" ],
|
||||||
|
"type": "date",
|
||||||
|
"expression": "Observation.effective"
|
||||||
|
}
|
||||||
|
""");
|
||||||
|
this.myStorageSettings.setMarkResourcesForReindexingUponSearchParameterChange(false);
|
||||||
|
callInFreshTx((tx, rd) -> {
|
||||||
|
DaoMethodOutcome result = mySearchParameterDao.update(sp, rd);
|
||||||
|
mySearchParamRegistry.forceRefresh();
|
||||||
|
return result;
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals(1, getSPIDXDateCount(observationPid), "still only one index row before reindex");
|
||||||
|
|
||||||
|
|
||||||
|
// suppose reindex job step starts here and loads the resource and ResourceTable entity
|
||||||
|
ThreadFactory loggingThreadFactory = getLoggingThreadFactory("Reindex-thread");
|
||||||
|
ExecutorService backgroundReindexThread = Executors.newSingleThreadExecutor(loggingThreadFactory);
|
||||||
|
Future<Integer> backgroundResult = backgroundReindexThread.submit(() -> {
|
||||||
|
try {
|
||||||
|
callInFreshTx((tx, rd) -> {
|
||||||
|
try {
|
||||||
|
ourLog.info("Starting background $reindex");
|
||||||
|
phaser.arriveAndAwaitSharedEndOf(Steps.STARTING);
|
||||||
|
|
||||||
|
phaser.assertInPhase(Steps.RUN_REINDEX);
|
||||||
|
ourLog.info("Run $reindex");
|
||||||
|
myObservationDao.reindex(JpaPid.fromIdAndResourceType(observationPid, "Observation"), rd, new TransactionDetails());
|
||||||
|
|
||||||
|
ourLog.info("$reindex done release main thread to delete");
|
||||||
|
phaser.arriveAndAwaitSharedEndOf(Steps.RUN_REINDEX);
|
||||||
|
|
||||||
|
ourLog.info("Wait for delete to finish");
|
||||||
|
phaser.arriveAndAwaitSharedEndOf(Steps.RUN_DELETE);
|
||||||
|
|
||||||
|
phaser.assertInPhase(Steps.COMMIT_REINDEX);
|
||||||
|
ourLog.info("Commit $reindex now that delete is finished");
|
||||||
|
// commit happens here at end of block
|
||||||
|
} catch (Exception e) {
|
||||||
|
ourLog.error("$reindex thread failed", e);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
phaser.arriveAndAwaitSharedEndOf(Steps.COMMIT_REINDEX);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
});
|
||||||
|
|
||||||
|
ourLog.info("Wait for $reindex to start");
|
||||||
|
phaser.arriveAndAwaitSharedEndOf(Steps.STARTING);
|
||||||
|
|
||||||
|
phaser.arriveAndAwaitSharedEndOf(Steps.RUN_REINDEX);
|
||||||
|
|
||||||
|
// then the resource is deleted
|
||||||
|
phaser.assertInPhase(Steps.RUN_DELETE);
|
||||||
|
|
||||||
|
ourLog.info("Deleting observation");
|
||||||
|
callInFreshTx((tx, rd) -> myObservationDao.delete(observationId, rd));
|
||||||
|
assertResourceDeleted(observationId);
|
||||||
|
assertEquals(0, getSPIDXDateCount(observationPid), "A deleted resource should have 0 index rows");
|
||||||
|
|
||||||
|
ourLog.info("Let $reindex commit");
|
||||||
|
phaser.arriveAndAwaitSharedEndOf(Steps.RUN_DELETE);
|
||||||
|
|
||||||
|
// then the reindex call finishes
|
||||||
|
ourLog.info("Await $reindex commit");
|
||||||
|
phaser.arriveAndAwaitSharedEndOf(Steps.COMMIT_REINDEX);
|
||||||
|
|
||||||
|
assertEquals(0, getSPIDXDateCount(observationPid), "A deleted resource should still have 0 index rows, after $reindex completes");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nonnull
|
||||||
|
static ThreadFactory getLoggingThreadFactory(String theThreadName) {
|
||||||
|
ThreadFactory loggingThreadFactory = r -> new Thread(() -> {
|
||||||
|
boolean success = false;
|
||||||
|
try {
|
||||||
|
r.run();
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
if (!success) {
|
||||||
|
ourLog.error("Background thread failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, theThreadName);
|
||||||
|
return loggingThreadFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
void assertResourceDeleted(IIdType observationId) {
|
||||||
|
try {
|
||||||
|
// confirm deleted
|
||||||
|
callInFreshTx((tx, rd)->
|
||||||
|
myObservationDao.read(observationId, rd, false));
|
||||||
|
fail("Read deleted resource");
|
||||||
|
} catch (ResourceGoneException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
<T> T callInFreshTx(BiFunction<TransactionStatus, RequestDetails, T> theCallback) {
|
||||||
|
SystemRequestDetails requestDetails = new SystemRequestDetails();
|
||||||
|
return myHapiTransactionService.withRequest(requestDetails)
|
||||||
|
.withTransactionDetails(new TransactionDetails())
|
||||||
|
.withPropagation(Propagation.REQUIRES_NEW)
|
||||||
|
.execute(tx-> theCallback.apply(tx, requestDetails));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int getSPIDXDateCount(long observationPid) {
|
||||||
|
return callInFreshTx((rd, tx) ->
|
||||||
|
myResourceIndexedSearchParamDateDao.findAllForResourceId(observationPid).size());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -103,6 +103,7 @@ import ca.uhn.fhir.rest.api.Constants;
|
||||||
import ca.uhn.fhir.rest.api.EncodingEnum;
|
import ca.uhn.fhir.rest.api.EncodingEnum;
|
||||||
import ca.uhn.fhir.rest.server.BasePagingProvider;
|
import ca.uhn.fhir.rest.server.BasePagingProvider;
|
||||||
import ca.uhn.fhir.rest.server.provider.ResourceProviderFactory;
|
import ca.uhn.fhir.rest.server.provider.ResourceProviderFactory;
|
||||||
|
import ca.uhn.fhir.storage.test.DaoTestDataBuilder;
|
||||||
import ca.uhn.fhir.test.utilities.ITestDataBuilder;
|
import ca.uhn.fhir.test.utilities.ITestDataBuilder;
|
||||||
import ca.uhn.fhir.util.UrlUtil;
|
import ca.uhn.fhir.util.UrlUtil;
|
||||||
import ca.uhn.fhir.validation.FhirValidator;
|
import ca.uhn.fhir.validation.FhirValidator;
|
||||||
|
|
|
@ -182,10 +182,16 @@
|
||||||
</exclusion>
|
</exclusion>
|
||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.mockito</groupId>
|
<groupId>org.mockito</groupId>
|
||||||
<artifactId>mockito-core</artifactId>
|
<artifactId>mockito-core</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.seregamorph</groupId>
|
||||||
|
<artifactId>hamcrest-more-matchers</artifactId>
|
||||||
|
<version>0.1</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
package ca.uhn.test.concurrency;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.Phaser;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test helper to force a particular sequence on 2 or more threads.
|
||||||
|
* Wraps Phaser with an Enum for better messages, and some test support.
|
||||||
|
* The only use is to impose a particular execution sequence over multiple threads when reproducing bugs.
|
||||||
|
*
|
||||||
|
* The simplest usage is to declare the number of collaborators as theParticipantCount
|
||||||
|
* in the constructor, and then have each participant thread call {@link #arriveAndAwaitSharedEndOf}
|
||||||
|
* as they finish the work of every phase.
|
||||||
|
* Every thread needs to confirm, even if they do no work in that phase.
|
||||||
|
* <p>
|
||||||
|
* Note: this is just a half-baked wrapper around Phaser.
|
||||||
|
* The behaviour is not especially precise, or tested. Comments welcome: MB.
|
||||||
|
*
|
||||||
|
* @param <E> an enum used to name the phases.
|
||||||
|
*/
|
||||||
|
public class LockstepEnumPhaser<E extends Enum<E>> {
|
||||||
|
private static final Logger ourLog = LoggerFactory.getLogger(LockstepEnumPhaser.class);
|
||||||
|
final Phaser myPhaser;
|
||||||
|
final Class<E> myEnumClass;
|
||||||
|
final E[] myEnumConstants;
|
||||||
|
|
||||||
|
public LockstepEnumPhaser(int theParticipantCount, Class<E> theEnumClass) {
|
||||||
|
myPhaser = new Phaser(theParticipantCount);
|
||||||
|
myEnumClass = theEnumClass;
|
||||||
|
myEnumConstants = myEnumClass.getEnumConstants();
|
||||||
|
}
|
||||||
|
|
||||||
|
public E arrive() {
|
||||||
|
E result = phaseToEnum(myPhaser.arrive());
|
||||||
|
ourLog.info("Arrive in phase {}", result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void assertInPhase(E theStageEnum) {
|
||||||
|
assertEquals(theStageEnum, getPhase(), "In stage " + theStageEnum);
|
||||||
|
}
|
||||||
|
|
||||||
|
public E getPhase() {
|
||||||
|
return phaseToEnum(myPhaser.getPhase());
|
||||||
|
}
|
||||||
|
|
||||||
|
public E awaitAdvance(E thePhase) {
|
||||||
|
checkAwait(thePhase);
|
||||||
|
return doAwait(thePhase);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Like arrive(), but verify stage first
|
||||||
|
*/
|
||||||
|
public E arriveAtMyEndOf(E thePhase) {
|
||||||
|
assertInPhase(thePhase);
|
||||||
|
return arrive();
|
||||||
|
}
|
||||||
|
|
||||||
|
public E arriveAndAwaitSharedEndOf(E thePhase) {
|
||||||
|
checkAwait(thePhase);
|
||||||
|
arrive();
|
||||||
|
return doAwait(thePhase);
|
||||||
|
}
|
||||||
|
|
||||||
|
public E arriveAndDeregister() {
|
||||||
|
return phaseToEnum(myPhaser.arriveAndDeregister());
|
||||||
|
}
|
||||||
|
|
||||||
|
public E register() {
|
||||||
|
return phaseToEnum(myPhaser.register());
|
||||||
|
}
|
||||||
|
|
||||||
|
private E doAwait(E thePhase) {
|
||||||
|
ourLog.debug("Start doAwait - {}", thePhase);
|
||||||
|
E phase = phaseToEnum(myPhaser.awaitAdvance(thePhase.ordinal()));
|
||||||
|
ourLog.info("Finish doAwait - {}", thePhase);
|
||||||
|
return phase;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkAwait(E thePhase) {
|
||||||
|
E currentPhase = getPhase();
|
||||||
|
if (currentPhase.ordinal() < thePhase.ordinal()) {
|
||||||
|
fail("Can't wait for end of phase " + thePhase + ", still in phase " + currentPhase);
|
||||||
|
} else if (currentPhase.ordinal() > thePhase.ordinal()) {
|
||||||
|
ourLog.warn("Skip waiting for phase {}, already in phase {}", thePhase, currentPhase);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private E phaseToEnum(int resultOrdinal) {
|
||||||
|
if (resultOrdinal >= myEnumConstants.length) {
|
||||||
|
throw new IllegalStateException("Enum " + myEnumClass.getName() + " should declare one more enum value for post-completion reporting of phase " + resultOrdinal);
|
||||||
|
}
|
||||||
|
return myEnumConstants[resultOrdinal];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,190 @@
|
||||||
|
package ca.uhn.test.concurrency;
|
||||||
|
|
||||||
|
import com.github.seregamorph.hamcrest.OrderMatchers;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.Timeout;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static ca.uhn.test.concurrency.LockstepEnumPhaserTest.Stages.FINISHED;
|
||||||
|
import static ca.uhn.test.concurrency.LockstepEnumPhaserTest.Stages.ONE;
|
||||||
|
import static ca.uhn.test.concurrency.LockstepEnumPhaserTest.Stages.THREE;
|
||||||
|
import static ca.uhn.test.concurrency.LockstepEnumPhaserTest.Stages.TWO;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
class LockstepEnumPhaserTest {
|
||||||
|
private static final Logger ourLog = LoggerFactory.getLogger(LockstepEnumPhaserTest.class);
|
||||||
|
final ExecutorService myExecutorService = Executors.newFixedThreadPool(10);
|
||||||
|
final List<Pair<Integer, Stages>> myProgressEvents = Collections.synchronizedList(new ArrayList<>());
|
||||||
|
/** Compare progress records by stage */
|
||||||
|
final Comparator<Pair<Integer, Stages>> myProgressStageComparator = Comparator.comparing(Pair::getRight);
|
||||||
|
|
||||||
|
enum Stages {
|
||||||
|
ONE, TWO, THREE, FINISHED
|
||||||
|
}
|
||||||
|
|
||||||
|
LockstepEnumPhaser<Stages> myPhaser;
|
||||||
|
|
||||||
|
@Timeout(1)
|
||||||
|
@Test
|
||||||
|
void phaserWithOnePariticpant_worksFine() {
|
||||||
|
// given
|
||||||
|
myPhaser = new LockstepEnumPhaser<>(1, Stages.class);
|
||||||
|
|
||||||
|
myPhaser.assertInPhase(ONE);
|
||||||
|
|
||||||
|
myPhaser.arriveAtMyEndOf(ONE);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(TWO);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(THREE);
|
||||||
|
|
||||||
|
myPhaser.assertInPhase(FINISHED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Timeout(5)
|
||||||
|
@Test
|
||||||
|
void phaserWithTwoThreads_runsInLockStep() throws InterruptedException, ExecutionException {
|
||||||
|
// given
|
||||||
|
myPhaser = new LockstepEnumPhaser<>(2, Stages.class);
|
||||||
|
|
||||||
|
// run two copies of the same schedule
|
||||||
|
AtomicInteger i = new AtomicInteger(0);
|
||||||
|
Callable<Integer> schedule = ()->{
|
||||||
|
// get unique ids for each thread.
|
||||||
|
int threadId = i.getAndIncrement();
|
||||||
|
|
||||||
|
myPhaser.assertInPhase(ONE);
|
||||||
|
ourLog.info("Starting");
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(ONE);
|
||||||
|
|
||||||
|
myPhaser.assertInPhase(TWO);
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(TWO);
|
||||||
|
|
||||||
|
myPhaser.assertInPhase(THREE);
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
Stages nextStage = myPhaser.awaitAdvance(TWO);
|
||||||
|
assertEquals(THREE, nextStage);
|
||||||
|
|
||||||
|
myPhaser.arriveAtMyEndOf(THREE);
|
||||||
|
|
||||||
|
ourLog.info("Finished");
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
};
|
||||||
|
Future<Integer> result1 = myExecutorService.submit(schedule);
|
||||||
|
Future<Integer> result2 = myExecutorService.submit(schedule);
|
||||||
|
|
||||||
|
assertEquals(1, result1.get());
|
||||||
|
assertEquals(1, result2.get());
|
||||||
|
assertThat("progress is ordered", myProgressEvents, OrderMatchers.softOrdered(myProgressStageComparator));
|
||||||
|
assertThat("all progress logged", myProgressEvents, Matchers.hasSize(6));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void recordProgress(int threadId) {
|
||||||
|
myProgressEvents.add(Pair.of(threadId, myPhaser.getPhase()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Timeout(5)
|
||||||
|
@Test
|
||||||
|
void phaserWithTwoThreads_canAddThird_sequencContinues() throws InterruptedException, ExecutionException {
|
||||||
|
// given
|
||||||
|
myPhaser = new LockstepEnumPhaser<>(2, Stages.class);
|
||||||
|
|
||||||
|
// run one simple schedule
|
||||||
|
Callable<Integer> schedule1 = ()->{
|
||||||
|
int threadId = 1;
|
||||||
|
ourLog.info("Starting schedule1");
|
||||||
|
myPhaser.assertInPhase(ONE);
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(ONE);
|
||||||
|
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(TWO);
|
||||||
|
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(THREE);
|
||||||
|
|
||||||
|
ourLog.info("Finished schedule1");
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
};
|
||||||
|
// this schedule will start half-way in
|
||||||
|
Callable<Integer> schedule2 = ()->{
|
||||||
|
int threadId = 2;
|
||||||
|
ourLog.info("Starting schedule2");
|
||||||
|
|
||||||
|
myPhaser.assertInPhase(TWO);
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(TWO);
|
||||||
|
|
||||||
|
myPhaser.assertInPhase(THREE);
|
||||||
|
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(THREE);
|
||||||
|
|
||||||
|
ourLog.info("Finished schedule2");
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
};
|
||||||
|
// this schedule will start schedule 2 half-way
|
||||||
|
Callable<Integer> schedule3 = ()->{
|
||||||
|
int threadId = 3;
|
||||||
|
myPhaser.assertInPhase(ONE);
|
||||||
|
ourLog.info("Starting schedule3");
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(ONE);
|
||||||
|
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
// add a new thread to the mix
|
||||||
|
myPhaser.register(); // tell the phaser to expect one more
|
||||||
|
myExecutorService.submit(schedule2);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(TWO);
|
||||||
|
|
||||||
|
recordProgress(threadId);
|
||||||
|
|
||||||
|
myPhaser.arriveAndAwaitSharedEndOf(THREE);
|
||||||
|
|
||||||
|
ourLog.info("Finished schedule3");
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
};
|
||||||
|
Future<Integer> result1 = myExecutorService.submit(schedule1);
|
||||||
|
Future<Integer> result2 = myExecutorService.submit(schedule3);
|
||||||
|
|
||||||
|
assertEquals(1, result1.get());
|
||||||
|
assertEquals(1, result2.get());
|
||||||
|
|
||||||
|
assertThat("progress is ordered", myProgressEvents, OrderMatchers.softOrdered(myProgressStageComparator));
|
||||||
|
assertThat("all progress logged", myProgressEvents, Matchers.hasSize(8));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue