diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 1b195b0f320..0c525b28817 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.NodeConnectionsService; @@ -37,6 +38,8 @@ import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigu import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode; +import org.elasticsearch.cluster.coordination.LinearizabilityChecker.History; +import org.elasticsearch.cluster.coordination.LinearizabilityChecker.SequentialSpec; import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,6 +50,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -60,8 +64,8 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver; +import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.MockGatewayMetaState; @@ -103,8 +107,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState; -import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue; -import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER; @@ -119,11 +121,11 @@ import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_C import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; -import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION; import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ALL; import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ID; import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_SETTING; import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES; +import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION; import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; @@ -945,7 +947,7 @@ public class CoordinatorTests extends ESTestCase { final Builder settingsBuilder = Settings.builder().put(cs.metaData().persistentSettings()); settingsBuilder.put(NO_MASTER_BLOCK_SETTING.getKey(), noMasterBlockSetting); return ClusterState.builder(cs).metaData(MetaData.builder(cs.metaData()).persistentSettings(settingsBuilder.build())).build(); - }); + }, (source, e) -> {}); cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing setting update"); leader.disconnect(); @@ -1142,6 +1144,8 @@ public class CoordinatorTests extends ESTestCase { private final Set disconnectedNodes = new HashSet<>(); private final Set blackholedNodes = new HashSet<>(); private final Map committedStatesByVersion = new HashMap<>(); + private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker(); + private final History history = new History(); private final Function defaultPersistedStateSupplier = MockPersistedState::new; @@ -1221,6 +1225,7 @@ public class CoordinatorTests extends ESTestCase { cleanupActions.add(() -> disruptStorage = false); final int randomSteps = scaledRandomIntBetween(10, 10000); + final int keyRange = randomSteps / 50; // for randomized writes and reads logger.info("--> start of safety phase of at least [{}] steps", randomSteps); deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY); @@ -1239,13 +1244,22 @@ public class CoordinatorTests extends ESTestCase { } try { - if (rarely()) { + if (finishTime == -1 && randomBoolean() && randomBoolean() && randomBoolean()) { final ClusterNode clusterNode = getAnyNodePreferringLeaders(); + final int key = randomIntBetween(0, keyRange); final int newValue = randomInt(); clusterNode.onNode(() -> { logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]", thisStep, newValue, clusterNode.getId()); - clusterNode.submitValue(newValue); + clusterNode.submitValue(key, newValue); + }).run(); + } else if (finishTime == -1 && randomBoolean() && randomBoolean() && randomBoolean()) { + final ClusterNode clusterNode = getAnyNodePreferringLeaders(); + final int key = randomIntBetween(0, keyRange); + clusterNode.onNode(() -> { + logger.debug("----> [runRandomly {}] reading value from [{}]", + thisStep, clusterNode.getId()); + clusterNode.readValue(key); }).run(); } else if (rarely()) { final ClusterNode clusterNode = getAnyNodePreferringLeaders(); @@ -1426,6 +1440,10 @@ public class CoordinatorTests extends ESTestCase { lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration())); assertThat("current configuration is already optimal", leader.improveConfiguration(lastAcceptedState), sameInstance(lastAcceptedState)); + + logger.info("checking linearizability of history with size {}: {}", history.size(), history); + assertTrue("history not linearizable: " + history, linearizabilityChecker.isLinearizable(spec, history, i -> null)); + logger.info("linearizability check completed"); } void bootstrapIfNecessary() { @@ -1802,14 +1820,55 @@ public class CoordinatorTests extends ESTestCase { .put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), autoShrinkVotingConfiguration) .build()) .build()) - .build()); + .build(), (source, e) -> {}); } AckCollector submitValue(final long value) { - return submitUpdateTask("new value [" + value + "]", cs -> setValue(cs, value)); + return submitValue(0, value); } - AckCollector submitUpdateTask(String source, UnaryOperator clusterStateUpdate) { + AckCollector submitValue(final int key, final long value) { + final int eventId = history.invoke(new Tuple<>(key, value)); + return submitUpdateTask("new value [" + value + "]", cs -> setValue(cs, key, value), new ClusterStateTaskListener() { + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + history.respond(eventId, value(oldState, key)); + } + + @Override + public void onNoLongerMaster(String source) { + // in this case, we know for sure that event was not processed by the system and will not change history + // remove event to help avoid bloated history and state space explosion in linearizability checker + history.remove(eventId); + } + + @Override + public void onFailure(String source, Exception e) { + // do not remove event from history, the write might still take place + // instead, complete history when checking for linearizability + } + }); + } + + void readValue(int key) { + final int eventId = history.invoke(new Tuple<>(key, null)); + submitUpdateTask("read value", cs -> ClusterState.builder(cs).build(), new ClusterStateTaskListener() { + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + history.respond(eventId, value(newState, key)); + } + + @Override + public void onFailure(String source, Exception e) { + // reads do not change state + // remove event to help avoid bloated history and state space explosion in linearizability checker + history.remove(eventId); + } + }); + } + + AckCollector submitUpdateTask(String source, UnaryOperator clusterStateUpdate, + ClusterStateTaskListener taskListener) { final AckCollector ackCollector = new AckCollector(); onNode(() -> { logger.trace("[{}] submitUpdateTask: enqueueing [{}]", localNode.getId(), source); @@ -1826,6 +1885,13 @@ public class CoordinatorTests extends ESTestCase { @Override public void onFailure(String source, Exception e) { logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e); + taskListener.onFailure(source, e); + } + + @Override + public void onNoLongerMaster(String source) { + logger.trace("no longer master: [{}]", source); + taskListener.onNoLongerMaster(source); } @Override @@ -1833,8 +1899,9 @@ public class CoordinatorTests extends ESTestCase { updateCommittedStates(); ClusterState state = committedStatesByVersion.get(newState.version()); assertNotNull("State not committed : " + newState.toString(), state); - assertEquals(value(state), value(newState)); + assertStateEquals(state, newState); logger.trace("successfully published: [{}]", newState); + taskListener.clusterStateProcessed(source, oldState, newState); } }); }).run(); @@ -2068,4 +2135,85 @@ public class CoordinatorTests extends ESTestCase { HANG, } + public ClusterState setValue(ClusterState clusterState, int key, long value) { + return ClusterState.builder(clusterState).metaData( + MetaData.builder(clusterState.metaData()) + .persistentSettings(Settings.builder() + .put(clusterState.metaData().persistentSettings()) + .put("value_" + key, value) + .build()) + .build()) + .build(); + } + + public long value(ClusterState clusterState) { + return value(clusterState, 0); + } + + public long value(ClusterState clusterState, int key) { + return clusterState.metaData().persistentSettings().getAsLong("value_" + key, 0L); + } + + public void assertStateEquals(ClusterState clusterState1, ClusterState clusterState2) { + assertEquals(clusterState1.version(), clusterState2.version()); + assertEquals(clusterState1.term(), clusterState2.term()); + assertEquals(keySet(clusterState1), keySet(clusterState2)); + for (int key : keySet(clusterState1)) { + assertEquals(value(clusterState1, key), value(clusterState2, key)); + } + } + + public Set keySet(ClusterState clusterState) { + return clusterState.metaData().persistentSettings().keySet().stream() + .filter(s -> s.startsWith("value_")).map(s -> Integer.valueOf(s.substring("value_".length()))).collect(Collectors.toSet()); + } + + /** + * Simple register model. Writes are modeled by providing an integer input. Reads are modeled by providing null as input. + * Responses that time out are modeled by returning null. Successful writes return the previous value of the register. + */ + private final SequentialSpec spec = new LinearizabilityChecker.KeyedSpec() { + @Override + public Object getKey(Object value) { + return ((Tuple) value).v1(); + } + + @Override + public Object getValue(Object value) { + return ((Tuple) value).v2(); + } + + @Override + public Object initialState() { + return 0L; + } + + @Override + public Optional nextState(Object currentState, Object input, Object output) { + // null input is read, non-null is write + if (input == null) { + // history is completed with null, simulating timeout, which assumes that read went through + if (output == null || currentState.equals(output)) { + return Optional.of(currentState); + } + return Optional.empty(); + } else { + if (output == null || currentState.equals(output)) { + // history is completed with null, simulating timeout, which assumes that write went through + return Optional.of(input); + } + return Optional.empty(); + } + } + }; + + public void testRegisterSpecConsistency() { + assertThat(spec.initialState(), equalTo(0L)); + assertThat(spec.nextState(7, 42, 7), equalTo(Optional.of(42))); // successful write 42 returns previous value 7 + assertThat(spec.nextState(7, 42, null), equalTo(Optional.of(42))); // write 42 times out + assertThat(spec.nextState(7, null, 7), equalTo(Optional.of(7))); // successful read + assertThat(spec.nextState(7, null, null), equalTo(Optional.of(7))); // read times out + assertThat(spec.nextState(7, null, 42), equalTo(Optional.empty())); + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java new file mode 100644 index 00000000000..94188c0fa5a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LinearizabilityChecker.java @@ -0,0 +1,376 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.common.collect.Tuple; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +/** + * Basic implementation of the Wing and Gong Graph Search Algorithm, following the descriptions in + * Gavin Lowe: Testing for linearizability + * Concurrency and Computation: Practice and Experience 29, 4 (2017). http://dx.doi.org/10.1002/cpe.3928 + * Alex Horn and Daniel Kroening: Faster linearizability checking via P-compositionality + * FORTE (2015). http://dx.doi.org/10.1007/978-3-319-19195-9_4 + */ +public class LinearizabilityChecker { + + /** + * Sequential specification of a datatype. Used as input for the linearizability checker. + * All parameter and return values should be immutable and have proper equals / hashCode implementations + */ + public interface SequentialSpec { + /** + * Returns the initial state of the datatype + */ + Object initialState(); + + /** + * Next-state function, checking whether transitioning the datatype in the given state under the provided input and output is valid. + * + * @param currentState the current state of the datatype + * @param input the input, associated with the given invocation event + * @param output the output, associated with the corresponding response event + * @return the next state, if the given current state, input and output are a valid transition, or Optional.empty() otherwise + */ + Optional nextState(Object currentState, Object input, Object output); + + /** + * For compositional checking, the history can be partitioned into sub-histories + * + * @param events the history of events to partition + * @return the partitioned history + */ + default Collection> partition(List events) { + return Collections.singleton(events); + } + } + + /** + * Sequential specification of a datatype that allows for keyed access, + * providing compositional checking (see {@link SequentialSpec#partition(List)}). + */ + public interface KeyedSpec extends SequentialSpec { + /** + * extracts the key from the given keyed invocation input value + */ + Object getKey(Object value); + + /** + * extracts the key-less value from the given keyed invocation input value + */ + Object getValue(Object value); + + @Override + default Collection> partition(List events) { + final Map> keyedPartitions = new HashMap<>(); + final Map matches = new HashMap<>(); + for (Event event : events) { + if (event.type == EventType.INVOCATION) { + final Object key = getKey(event.value); + final Object val = getValue(event.value); + final Event unfoldedEvent = new Event(EventType.INVOCATION, val, event.id); + keyedPartitions.computeIfAbsent(key, k -> new ArrayList<>()).add(unfoldedEvent); + matches.put(event.id, key); + } else { + final Object key = matches.get(event.id); + keyedPartitions.get(key).add(event); + } + } + return keyedPartitions.values(); + } + } + + /** + * Sequence of invocations and responses, recording the run of a concurrent system. + */ + public static class History { + private final List events; + private int nextId; + + public History() { + events = new ArrayList<>(); + nextId = 0; + } + + /** + * Appends a new invocation event to the history + * + * @param input the input value associated with the invocation event + * @return an id that can be used to record the corresponding response event + */ + public int invoke(Object input) { + final int id = nextId++; + events.add(new Event(EventType.INVOCATION, input, id)); + return id; + } + + /** + * Appends a new response event to the history + * + * @param id the id of the corresponding invocation event + * @param output the output value associated with the response event + */ + public void respond(int id, Object output) { + events.add(new Event(EventType.RESPONSE, output, id)); + } + + /** + * Removes the events with the corresponding id from the history + * + * @param id the value of the id to remove + */ + public void remove(int id) { + events.removeIf(e -> e.id == id); + } + + /** + * Completes the history with response events for invocations that are missing corresponding responses + * + * @param missingResponseGenerator a function from invocation input to response output, used to generate the corresponding response + */ + public void complete(Function missingResponseGenerator) { + final Map uncompletedInvocations = new HashMap<>(); + for (Event event : events) { + if (event.type == EventType.INVOCATION) { + uncompletedInvocations.put(event.id, event); + } else { + final Event removed = uncompletedInvocations.remove(event.id); + if (removed == null) { + throw new IllegalArgumentException("history not well-formed: " + events); + } + } + } + for (Map.Entry entry : uncompletedInvocations.entrySet()) { + events.add(new Event(EventType.RESPONSE, missingResponseGenerator.apply(entry.getValue().value), entry.getKey())); + } + } + + @Override + public History clone() { + final History history = new History(); + history.events.addAll(events); + history.nextId = nextId; + return history; + } + + /** + * Returns the number of recorded events + */ + public int size() { + return events.size(); + } + + @Override + public String toString() { + return "History{" + + "events=" + events + + ", nextId=" + nextId + + '}'; + } + } + + /** + * Checks whether the provided history is linearizable with respect to the given sequential specification + * + * @param spec the sequential specification of the datatype + * @param history the history of events to check for linearizability + * @param missingResponseGenerator used to complete the history with missing responses + * @return true iff the history is linearizable w.r.t. the given spec + */ + public boolean isLinearizable(SequentialSpec spec, History history, Function missingResponseGenerator) { + history = history.clone(); // clone history before completing it + history.complete(missingResponseGenerator); // complete history + final Collection> partitions = spec.partition(history.events); + return partitions.stream().allMatch(h -> isLinearizable(spec, h)); + } + + private boolean isLinearizable(SequentialSpec spec, List history) { + Object state = spec.initialState(); // the current state of the datatype + final FixedBitSet linearized = new FixedBitSet(history.size() / 2); // the linearized prefix of the history + + final Set> cache = new HashSet<>(); // cache of explored pairs + final Deque> calls = new LinkedList<>(); // path we're currently exploring + + final Entry headEntry = createLinkedEntries(history); + Entry entry = headEntry.next; // current entry + + while (headEntry.next != null) { + if (entry.match != null) { + final Optional maybeNextState = spec.nextState(state, entry.event.value, entry.match.event.value); + boolean shouldExploreNextState = false; + if (maybeNextState.isPresent()) { + // check if we have already explored this linearization + final FixedBitSet updatedLinearized = linearized.clone(); + updatedLinearized.set(entry.id); + shouldExploreNextState = cache.add(new Tuple<>(maybeNextState.get(), updatedLinearized)); + } + if (shouldExploreNextState) { + calls.push(new Tuple<>(entry, state)); + state = maybeNextState.get(); + linearized.set(entry.id); + entry.lift(); + entry = headEntry.next; + } else { + entry = entry.next; + } + } else { + if (calls.isEmpty()) { + return false; + } + final Tuple top = calls.pop(); + entry = top.v1(); + state = top.v2(); + linearized.clear(entry.id); + entry.unlift(); + entry = entry.next; + } + } + return true; + } + + /** + * Convenience method for {@link #isLinearizable(SequentialSpec, History, Function)} that requires the history to be complete + */ + public boolean isLinearizable(SequentialSpec spec, History history) { + return isLinearizable(spec, history, o -> { + throw new IllegalArgumentException("history is not complete"); + }); + } + + /** + * Creates the internal linked data structure used by the linearizability checker. + * Generates contiguous internal ids for the events so that they can be efficiently recorded in bit sets. + */ + private static Entry createLinkedEntries(List history) { + if (history.size() % 2 != 0) { + throw new IllegalArgumentException("mismatch between number of invocations and responses"); + } + + // first, create entries and link response events to invocation events + final Map matches = new HashMap<>(); // map from event id to matching response entry + final Entry[] entries = new Entry[history.size()]; + int nextInternalId = (history.size() / 2) - 1; + for (int i = history.size() - 1; i >= 0; i--) { + final Event elem = history.get(i); + if (elem.type == EventType.RESPONSE) { + final Entry entry = entries[i] = new Entry(elem, null, nextInternalId--); + final Entry prev = matches.put(elem.id, entry); + if (prev != null) { + throw new IllegalArgumentException("duplicate response with id " + elem.id); + } + } else { + final Entry matchingResponse = matches.get(elem.id); + if (matchingResponse == null) { + throw new IllegalArgumentException("no matching response found for " + elem); + } + entries[i] = new Entry(elem, matchingResponse, matchingResponse.id); + } + } + + // sanity check + if (nextInternalId != -1) { + throw new IllegalArgumentException("id mismatch"); + } + + // now link entries together in history order, and add a sentinel node at the beginning + Entry first = new Entry(null, null, -1); + Entry lastEntry = first; + for (Entry entry : entries) { + lastEntry.next = entry; + entry.prev = lastEntry; + lastEntry = entry; + } + + return first; + } + + enum EventType { + INVOCATION, + RESPONSE + } + + public static class Event { + public final EventType type; + public final Object value; + public final int id; + + public Event(EventType type, Object value, int id) { + this.type = type; + this.value = value; + this.id = id; + } + + @Override + public String toString() { + return "Event{" + + "type=" + type + + ", value=" + value + + ", id=" + id + + '}'; + } + } + + static class Entry { + final Event event; + final Entry match; // null if current entry is a response, non-null if it's an invocation + final int id; // internal id, distinct from Event.id + Entry prev; + Entry next; + + Entry(Event event, Entry match, int id) { + this.event = event; + this.match = match; + this.id = id; + } + + // removes this entry from the surrounding structures + void lift() { + prev.next = next; + next.prev = prev; + match.prev.next = match.next; + if (match.next != null) { + match.next.prev = match.prev; + } + } + + // reinserts this entry into the surrounding structures + void unlift() { + match.prev.next = match; + if (match.next != null) { + match.next.prev = match; + } + prev.next = this; + next.prev = this; + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LinearizabilityCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LinearizabilityCheckerTests.java new file mode 100644 index 00000000000..4b2acd72877 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LinearizabilityCheckerTests.java @@ -0,0 +1,271 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.cluster.coordination.LinearizabilityChecker.History; +import org.elasticsearch.cluster.coordination.LinearizabilityChecker.KeyedSpec; +import org.elasticsearch.cluster.coordination.LinearizabilityChecker.SequentialSpec; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.test.ESTestCase; + +import java.util.Optional; + +import static org.hamcrest.Matchers.equalTo; + +public class LinearizabilityCheckerTests extends ESTestCase { + + final LinearizabilityChecker checker = new LinearizabilityChecker(); + + /** + * Simple specification of a lock that can be exactly locked once. There is no unlocking. + * Input is always null (and represents lock acquisition), output is a boolean whether lock was acquired. + */ + final SequentialSpec lockSpec = new SequentialSpec() { + + @Override + public Object initialState() { + return false; + } + + @Override + public Optional nextState(Object currentState, Object input, Object output) { + if (input != null) { + throw new AssertionError("invalid history: input must be null"); + } + if (output instanceof Boolean == false) { + throw new AssertionError("invalid history: output must be boolean"); + } + if (false == (boolean) currentState) { + if (false == (boolean) output) { + return Optional.empty(); + } + return Optional.of(true); + } else if (false == (boolean) output) { + return Optional.of(currentState); + } + return Optional.empty(); + } + }; + + public void testLockConsistent() { + assertThat(lockSpec.initialState(), equalTo(false)); + assertThat(lockSpec.nextState(false, null, true), equalTo(Optional.of(true))); + assertThat(lockSpec.nextState(false, null, false), equalTo(Optional.empty())); + assertThat(lockSpec.nextState(true, null, false), equalTo(Optional.of(true))); + assertThat(lockSpec.nextState(true, null, true), equalTo(Optional.empty())); + } + + public void testLockWithLinearizableHistory1() { + final History history = new History(); + int call0 = history.invoke(null); // 0: acquire lock + history.respond(call0, true); // 0: lock acquisition succeeded + int call1 = history.invoke(null); // 1: acquire lock + history.respond(call1, false); // 0: lock acquisition failed + assertTrue(checker.isLinearizable(lockSpec, history)); + } + + public void testLockWithLinearizableHistory2() { + final History history = new History(); + int call0 = history.invoke(null); // 0: acquire lock + int call1 = history.invoke(null); // 1: acquire lock + history.respond(call0, false); // 0: lock acquisition failed + history.respond(call1, true); // 0: lock acquisition succeeded + assertTrue(checker.isLinearizable(lockSpec, history)); + } + + public void testLockWithLinearizableHistory3() { + final History history = new History(); + int call0 = history.invoke(null); // 0: acquire lock + int call1 = history.invoke(null); // 1: acquire lock + history.respond(call0, true); // 0: lock acquisition succeeded + history.respond(call1, false); // 0: lock acquisition failed + assertTrue(checker.isLinearizable(lockSpec, history)); + } + + public void testLockWithNonLinearizableHistory() { + final History history = new History(); + int call0 = history.invoke(null); // 0: acquire lock + history.respond(call0, false); // 0: lock acquisition failed + int call1 = history.invoke(null); // 1: acquire lock + history.respond(call1, true); // 0: lock acquisition succeeded + assertFalse(checker.isLinearizable(lockSpec, history)); + } + + /** + * Simple specification of a read/write register. + * Writes are modeled as integer inputs (with corresponding null responses) and + * reads are modeled as null inputs with integer outputs. + */ + final SequentialSpec registerSpec = new SequentialSpec() { + + @Override + public Object initialState() { + return 0; + } + + @Override + public Optional nextState(Object currentState, Object input, Object output) { + if ((input == null) == (output == null)) { + throw new AssertionError("invalid history: exactly one of input or output must be null"); + } + if (input != null) { + return Optional.of(input); + } else if (output.equals(currentState)) { + return Optional.of(currentState); + } + return Optional.empty(); + } + }; + + public void testRegisterConsistent() { + assertThat(registerSpec.initialState(), equalTo(0)); + assertThat(registerSpec.nextState(7, 42, null), equalTo(Optional.of(42))); + assertThat(registerSpec.nextState(7, null, 7), equalTo(Optional.of(7))); + assertThat(registerSpec.nextState(7, null, 42), equalTo(Optional.empty())); + } + + public void testRegisterWithLinearizableHistory() { + final History history = new History(); + int call0 = history.invoke(42); // 0: invoke write 42 + int call1 = history.invoke(null); // 1: invoke read + int call2 = history.invoke(null); // 2: invoke read + history.respond(call2, 0); // 2: read returns 0 + history.respond(call1, 42); // 1: read returns 42 + + expectThrows(IllegalArgumentException.class, () -> checker.isLinearizable(registerSpec, history)); + assertTrue(checker.isLinearizable(registerSpec, history, i -> null)); + + history.respond(call0, null); // 0: write returns + assertTrue(checker.isLinearizable(registerSpec, history)); + } + + public void testRegisterWithNonLinearizableHistory() { + final History history = new History(); + int call0 = history.invoke(42); // 0: invoke write 42 + int call1 = history.invoke(null); // 1: invoke read + history.respond(call1, 42); // 1: read returns 42 + int call2 = history.invoke(null); // 2: invoke read + history.respond(call2, 0); // 2: read returns 0, not allowed + + expectThrows(IllegalArgumentException.class, () -> checker.isLinearizable(registerSpec, history)); + assertFalse(checker.isLinearizable(registerSpec, history, i -> null)); + + history.respond(call0, null); // 0: write returns + assertFalse(checker.isLinearizable(registerSpec, history)); + } + + public void testRegisterObservedSequenceOfUpdatesWitLinearizableHistory() { + final History history = new History(); + int call0 = history.invoke(42); // 0: invoke write 42 + int call1 = history.invoke(43); // 1: invoke write 43 + int call2 = history.invoke(null); // 2: invoke read + history.respond(call2, 42); // 1: read returns 42 + int call3 = history.invoke(null); // 3: invoke read + history.respond(call3, 43); // 3: read returns 43 + int call4 = history.invoke(null); // 4: invoke read + history.respond(call4, 43); // 4: read returns 43 + + history.respond(call0, null); // 0: write returns + history.respond(call1, null); // 1: write returns + + assertTrue(checker.isLinearizable(registerSpec, history)); + } + + public void testRegisterObservedSequenceOfUpdatesWithNonLinearizableHistory() { + final History history = new History(); + int call0 = history.invoke(42); // 0: invoke write 42 + int call1 = history.invoke(43); // 1: invoke write 43 + int call2 = history.invoke(null); // 2: invoke read + history.respond(call2, 42); // 1: read returns 42 + int call3 = history.invoke(null); // 3: invoke read + history.respond(call3, 43); // 3: read returns 43 + int call4 = history.invoke(null); // 4: invoke read + history.respond(call4, 42); // 4: read returns 42, not allowed + + history.respond(call0, null); // 0: write returns + history.respond(call1, null); // 1: write returns + + assertFalse(checker.isLinearizable(registerSpec, history)); + } + + final SequentialSpec multiRegisterSpec = new KeyedSpec() { + + @Override + public Object getKey(Object value) { + return ((Tuple) value).v1(); + } + + @Override + public Object getValue(Object value) { + return ((Tuple) value).v2(); + } + + @Override + public Object initialState() { + return registerSpec.initialState(); + } + + @Override + public Optional nextState(Object currentState, Object input, Object output) { + return registerSpec.nextState(currentState, input, output); + } + }; + + public void testMultiRegisterWithLinearizableHistory() { + final History history = new History(); + int callX0 = history.invoke(new Tuple<>("x", 42)); // 0: invoke write 42 on key x + int callX1 = history.invoke(new Tuple<>("x", null)); // 1: invoke read on key x + int callY0 = history.invoke(new Tuple<>("y", 42)); // 0: invoke write 42 on key y + int callY1 = history.invoke(new Tuple<>("y", null)); // 1: invoke read on key y + int callX2 = history.invoke(new Tuple<>("x", null)); // 2: invoke read on key x + int callY2 = history.invoke(new Tuple<>("y", null)); // 2: invoke read on key y + history.respond(callX2, 0); // 2: read returns 0 on key x + history.respond(callY2, 0); // 2: read returns 0 on key y + history.respond(callY1, 42); // 1: read returns 42 on key y + history.respond(callX1, 42); // 1: read returns 42 on key x + + expectThrows(IllegalArgumentException.class, () -> checker.isLinearizable(multiRegisterSpec, history)); + assertTrue(checker.isLinearizable(multiRegisterSpec, history, i -> null)); + + history.respond(callX0, null); // 0: write returns on key x + history.respond(callY0, null); // 0: write returns on key y + assertTrue(checker.isLinearizable(multiRegisterSpec, history)); + } + + public void testMultiRegisterWithNonLinearizableHistory() { + final History history = new History(); + int callX0 = history.invoke(new Tuple<>("x", 42)); // 0: invoke write 42 on key x + int callX1 = history.invoke(new Tuple<>("x", null)); // 1: invoke read on key x + int callY0 = history.invoke(new Tuple<>("y", 42)); // 0: invoke write 42 on key y + int callY1 = history.invoke(new Tuple<>("y", null)); // 1: invoke read on key y + int callX2 = history.invoke(new Tuple<>("x", null)); // 2: invoke read on key x + history.respond(callY1, 42); // 1: read returns 42 on key y + int callY2 = history.invoke(new Tuple<>("y", null)); // 2: invoke read on key y + history.respond(callX2, 0); // 2: read returns 0 on key x + history.respond(callY2, 0); // 2: read returns 0 on key y, not allowed + history.respond(callX1, 42); // 1: read returns 42 on key x + + expectThrows(IllegalArgumentException.class, () -> checker.isLinearizable(multiRegisterSpec, history)); + assertFalse(checker.isLinearizable(multiRegisterSpec, history, i -> null)); + + history.respond(callX0, null); // 0: write returns on key x + history.respond(callY0, null); // 0: write returns on key y + assertFalse(checker.isLinearizable(multiRegisterSpec, history)); + } +}