Terminate linearizability check early on large histories (#44444)

Large histories can be problematic and have the linearizability checker occasionally run OOM. As it's
very difficult to bound the size of the histories just right, this PR will let it instead run for 10 seconds
on large histories and then abort.

Closes #44429
This commit is contained in:
Yannick Welsch 2019-07-17 17:32:07 +02:00
parent a19c7977ac
commit f78e64e3e2
2 changed files with 38 additions and 9 deletions

View File

@ -38,6 +38,8 @@ import org.elasticsearch.discovery.AbstractDisruptionTestCase;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
@ -50,8 +52,10 @@ import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
@ -438,13 +442,24 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase {
} }
} }
public boolean isLinearizable() { public void assertLinearizable() {
logger.info("--> Linearizability checking history of size: {} for key: {} and initialVersion: {}: {}", history.size(), logger.info("--> Linearizability checking history of size: {} for key: {} and initialVersion: {}: {}", history.size(),
id, initialVersion, history); id, initialVersion, history);
LinearizabilityChecker.SequentialSpec spec = new CASSequentialSpec(initialVersion); LinearizabilityChecker.SequentialSpec spec = new CASSequentialSpec(initialVersion);
boolean linearizable = false; boolean linearizable = false;
try { try {
linearizable = new LinearizabilityChecker().isLinearizable(spec, history, missingResponseGenerator()); final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
final AtomicBoolean abort = new AtomicBoolean();
// Large histories can be problematic and have the linearizability checker run OOM
// Bound the time how long the checker can run on such histories (Values empirically determined)
if (history.size() > 300) {
scheduler.schedule(() -> abort.set(true), 10, TimeUnit.SECONDS);
}
linearizable = new LinearizabilityChecker().isLinearizable(spec, history, missingResponseGenerator(), abort::get);
ThreadPool.terminate(scheduler, 1, TimeUnit.SECONDS);
if (abort.get() && linearizable == false) {
linearizable = true; // let the test pass
}
} finally { } finally {
// implicitly test that we can serialize all histories. // implicitly test that we can serialize all histories.
String serializedHistory = base64Serialize(history); String serializedHistory = base64Serialize(history);
@ -454,11 +469,7 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase {
spec, initialVersion, serializedHistory); spec, initialVersion, serializedHistory);
} }
} }
return linearizable; assertTrue("Must be linearizable", linearizable);
}
public void assertLinearizable() {
assertTrue("Must be linearizable", isLinearizable());
} }
} }

View File

@ -39,6 +39,7 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
@ -227,13 +228,27 @@ public class LinearizabilityChecker {
* @return true iff the history is linearizable w.r.t. the given spec * @return true iff the history is linearizable w.r.t. the given spec
*/ */
public boolean isLinearizable(SequentialSpec spec, History history, Function<Object, Object> missingResponseGenerator) { public boolean isLinearizable(SequentialSpec spec, History history, Function<Object, Object> missingResponseGenerator) {
return isLinearizable(spec, history, missingResponseGenerator, () -> false);
}
/**
* Checks whether the provided history is linearizable with respect to the given sequential specification
*
* @param spec the sequential specification of the datatype
* @param history the history of events to check for linearizability
* @param missingResponseGenerator used to complete the history with missing responses
* @param terminateEarly a condition upon which to terminate early
* @return true iff the history is linearizable w.r.t. the given spec
*/
public boolean isLinearizable(SequentialSpec spec, History history, Function<Object, Object> missingResponseGenerator,
BooleanSupplier terminateEarly) {
history = history.clone(); // clone history before completing it history = history.clone(); // clone history before completing it
history.complete(missingResponseGenerator); // complete history history.complete(missingResponseGenerator); // complete history
final Collection<List<Event>> partitions = spec.partition(history.copyEvents()); final Collection<List<Event>> partitions = spec.partition(history.copyEvents());
return partitions.stream().allMatch(h -> isLinearizable(spec, h)); return partitions.stream().allMatch(h -> isLinearizable(spec, h, terminateEarly));
} }
private boolean isLinearizable(SequentialSpec spec, List<Event> history) { private boolean isLinearizable(SequentialSpec spec, List<Event> history, BooleanSupplier terminateEarly) {
logger.debug("Checking history of size: {}: {}", history.size(), history); logger.debug("Checking history of size: {}: {}", history.size(), history);
Object state = spec.initialState(); // the current state of the datatype Object state = spec.initialState(); // the current state of the datatype
final FixedBitSet linearized = new FixedBitSet(history.size() / 2); // the linearized prefix of the history final FixedBitSet linearized = new FixedBitSet(history.size() / 2); // the linearized prefix of the history
@ -245,6 +260,9 @@ public class LinearizabilityChecker {
Entry entry = headEntry.next; // current entry Entry entry = headEntry.next; // current entry
while (headEntry.next != null) { while (headEntry.next != null) {
if (terminateEarly.getAsBoolean()) {
return false;
}
if (entry.match != null) { if (entry.match != null) {
final Optional<Object> maybeNextState = spec.nextState(state, entry.event.value, entry.match.event.value); final Optional<Object> maybeNextState = spec.nextState(state, entry.event.value, entry.match.event.value);
boolean shouldExploreNextState = false; boolean shouldExploreNextState = false;