diff --git a/server/src/test/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java b/server/src/test/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java index 38255b13a73..3440bf14259 100644 --- a/server/src/test/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java +++ b/server/src/test/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java @@ -38,6 +38,8 @@ import org.elasticsearch.discovery.AbstractDisruptionTestCase; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; import java.io.FileInputStream; import java.io.IOException; @@ -50,8 +52,10 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -438,13 +442,24 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase { } } - public boolean isLinearizable() { + public void assertLinearizable() { logger.info("--> Linearizability checking history of size: {} for key: {} and initialVersion: {}: {}", history.size(), id, initialVersion, history); LinearizabilityChecker.SequentialSpec spec = new CASSequentialSpec(initialVersion); boolean linearizable = false; try { - linearizable = new LinearizabilityChecker().isLinearizable(spec, history, missingResponseGenerator()); + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + final AtomicBoolean abort = new AtomicBoolean(); + // Large histories can be problematic and have the linearizability checker run OOM + // Bound the time how long the checker can run on such histories (Values empirically determined) + if (history.size() > 300) { + scheduler.schedule(() -> abort.set(true), 10, TimeUnit.SECONDS); + } + linearizable = new LinearizabilityChecker().isLinearizable(spec, history, missingResponseGenerator(), abort::get); + ThreadPool.terminate(scheduler, 1, TimeUnit.SECONDS); + if (abort.get() && linearizable == false) { + linearizable = true; // let the test pass + } } finally { // implicitly test that we can serialize all histories. String serializedHistory = base64Serialize(history); @@ -454,11 +469,7 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase { spec, initialVersion, serializedHistory); } } - return linearizable; - } - - public void assertLinearizable() { - assertTrue("Must be linearizable", isLinearizable()); + assertTrue("Must be linearizable", linearizable); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java index a854d403557..0abbcf863ca 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java @@ -39,6 +39,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; @@ -227,13 +228,27 @@ public class LinearizabilityChecker { * @return true iff the history is linearizable w.r.t. the given spec */ public boolean isLinearizable(SequentialSpec spec, History history, Function missingResponseGenerator) { + return isLinearizable(spec, history, missingResponseGenerator, () -> false); + } + + /** + * Checks whether the provided history is linearizable with respect to the given sequential specification + * + * @param spec the sequential specification of the datatype + * @param history the history of events to check for linearizability + * @param missingResponseGenerator used to complete the history with missing responses + * @param terminateEarly a condition upon which to terminate early + * @return true iff the history is linearizable w.r.t. the given spec + */ + public boolean isLinearizable(SequentialSpec spec, History history, Function missingResponseGenerator, + BooleanSupplier terminateEarly) { history = history.clone(); // clone history before completing it history.complete(missingResponseGenerator); // complete history final Collection> partitions = spec.partition(history.copyEvents()); - return partitions.stream().allMatch(h -> isLinearizable(spec, h)); + return partitions.stream().allMatch(h -> isLinearizable(spec, h, terminateEarly)); } - private boolean isLinearizable(SequentialSpec spec, List history) { + private boolean isLinearizable(SequentialSpec spec, List history, BooleanSupplier terminateEarly) { logger.debug("Checking history of size: {}: {}", history.size(), history); Object state = spec.initialState(); // the current state of the datatype final FixedBitSet linearized = new FixedBitSet(history.size() / 2); // the linearized prefix of the history @@ -245,6 +260,9 @@ public class LinearizabilityChecker { Entry entry = headEntry.next; // current entry while (headEntry.next != null) { + if (terminateEarly.getAsBoolean()) { + return false; + } if (entry.match != null) { final Optional maybeNextState = spec.nextState(state, entry.event.value, entry.match.event.value); boolean shouldExploreNextState = false;