From f78e64e3e28e8beaa41138d8c48716a8cafb5356 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 17 Jul 2019 17:32:07 +0200 Subject: [PATCH] Terminate linearizability check early on large histories (#44444) Large histories can be problematic and have the linearizability checker occasionally run OOM. As it's very difficult to bound the size of the histories just right, this PR will let it instead run for 10 seconds on large histories and then abort. Closes #44429 --- .../ConcurrentSeqNoVersioningIT.java | 25 +++++++++++++------ .../coordination/LinearizabilityChecker.java | 22 ++++++++++++++-- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java b/server/src/test/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java index 38255b13a73..3440bf14259 100644 --- a/server/src/test/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java +++ b/server/src/test/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java @@ -38,6 +38,8 @@ import org.elasticsearch.discovery.AbstractDisruptionTestCase; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; import java.io.FileInputStream; import java.io.IOException; @@ -50,8 +52,10 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -438,13 +442,24 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase { } } - public boolean isLinearizable() { + public void assertLinearizable() { logger.info("--> Linearizability checking history of size: {} for key: {} and initialVersion: {}: {}", history.size(), id, initialVersion, history); LinearizabilityChecker.SequentialSpec spec = new CASSequentialSpec(initialVersion); boolean linearizable = false; try { - linearizable = new LinearizabilityChecker().isLinearizable(spec, history, missingResponseGenerator()); + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + final AtomicBoolean abort = new AtomicBoolean(); + // Large histories can be problematic and have the linearizability checker run OOM + // Bound the time how long the checker can run on such histories (Values empirically determined) + if (history.size() > 300) { + scheduler.schedule(() -> abort.set(true), 10, TimeUnit.SECONDS); + } + linearizable = new LinearizabilityChecker().isLinearizable(spec, history, missingResponseGenerator(), abort::get); + ThreadPool.terminate(scheduler, 1, TimeUnit.SECONDS); + if (abort.get() && linearizable == false) { + linearizable = true; // let the test pass + } } finally { // implicitly test that we can serialize all histories. String serializedHistory = base64Serialize(history); @@ -454,11 +469,7 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase { spec, initialVersion, serializedHistory); } } - return linearizable; - } - - public void assertLinearizable() { - assertTrue("Must be linearizable", isLinearizable()); + assertTrue("Must be linearizable", linearizable); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java index a854d403557..0abbcf863ca 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java @@ -39,6 +39,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; @@ -227,13 +228,27 @@ public class LinearizabilityChecker { * @return true iff the history is linearizable w.r.t. the given spec */ public boolean isLinearizable(SequentialSpec spec, History history, Function missingResponseGenerator) { + return isLinearizable(spec, history, missingResponseGenerator, () -> false); + } + + /** + * Checks whether the provided history is linearizable with respect to the given sequential specification + * + * @param spec the sequential specification of the datatype + * @param history the history of events to check for linearizability + * @param missingResponseGenerator used to complete the history with missing responses + * @param terminateEarly a condition upon which to terminate early + * @return true iff the history is linearizable w.r.t. the given spec + */ + public boolean isLinearizable(SequentialSpec spec, History history, Function missingResponseGenerator, + BooleanSupplier terminateEarly) { history = history.clone(); // clone history before completing it history.complete(missingResponseGenerator); // complete history final Collection> partitions = spec.partition(history.copyEvents()); - return partitions.stream().allMatch(h -> isLinearizable(spec, h)); + return partitions.stream().allMatch(h -> isLinearizable(spec, h, terminateEarly)); } - private boolean isLinearizable(SequentialSpec spec, List history) { + private boolean isLinearizable(SequentialSpec spec, List history, BooleanSupplier terminateEarly) { logger.debug("Checking history of size: {}: {}", history.size(), history); Object state = spec.initialState(); // the current state of the datatype final FixedBitSet linearized = new FixedBitSet(history.size() / 2); // the linearized prefix of the history @@ -245,6 +260,9 @@ public class LinearizabilityChecker { Entry entry = headEntry.next; // current entry while (headEntry.next != null) { + if (terminateEarly.getAsBoolean()) { + return false; + } if (entry.match != null) { final Optional maybeNextState = spec.nextState(state, entry.event.value, entry.match.event.value); boolean shouldExploreNextState = false;