SOLR-11447: ZkStateWriter should process commands in atomic

This commit is contained in:
Cao Manh Dat 2017-10-13 15:23:44 +07:00
parent 6fecf849b6
commit 8a7d04a658
6 changed files with 181 additions and 132 deletions

View File

@ -50,6 +50,8 @@ Bug Fixes
* SOLR-11445: Overseer should not hang when process bad message. (Cao Manh Dat, shalin) * SOLR-11445: Overseer should not hang when process bad message. (Cao Manh Dat, shalin)
* SOLR-11447: ZkStateWriter should process commands in atomic. (Cao Manh Dat, shalin)
================== 7.1.0 ================== ================== 7.1.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -65,8 +65,9 @@ import static org.apache.solr.common.params.CommonParams.ID;
public class Overseer implements Closeable { public class Overseer implements Closeable {
public static final String QUEUE_OPERATION = "operation"; public static final String QUEUE_OPERATION = "operation";
public static final int STATE_UPDATE_DELAY = 2000; // delay between cloud state updates // System properties are used in tests to make them run fast
public static final int STATE_UPDATE_BATCH_SIZE = 10000; public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates
public static final int STATE_UPDATE_BATCH_SIZE = Integer.getInteger("solr.OverseerStateUpdateBatchSize", 10000);
public static final int STATE_UPDATE_MAX_QUEUE = 20000; public static final int STATE_UPDATE_MAX_QUEUE = 20000;
public static final int NUM_RESPONSES_TO_STORE = 10000; public static final int NUM_RESPONSES_TO_STORE = 10000;
@ -287,9 +288,7 @@ public class Overseer implements Closeable {
timerContext.stop(); timerContext.stop();
} }
if (zkWriteCommands != null) { if (zkWriteCommands != null) {
for (ZkWriteCommand zkWriteCommand : zkWriteCommands) { clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommands, callback);
clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
}
if (!enableBatching) { if (!enableBatching) {
clusterState = zkStateWriter.writePendingUpdates(); clusterState = zkStateWriter.writePendingUpdates();
} }

View File

@ -18,6 +18,7 @@ package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -41,11 +42,11 @@ import static java.util.Collections.singletonMap;
* each of which get their own individual state.json in ZK. * each of which get their own individual state.json in ZK.
* *
* Updates to the cluster state are specified using the * Updates to the cluster state are specified using the
* {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} method. The class buffers updates * {@link #enqueueUpdate(ClusterState, List, ZkWriteCallback)} method. The class buffers updates
* to reduce the number of writes to ZK. The buffered updates are flushed during <code>enqueueUpdate</code> * to reduce the number of writes to ZK. The buffered updates are flushed during <code>enqueueUpdate</code>
* automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates. * automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates.
* *
* If either {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} or {@link #writePendingUpdates()} * If either {@link #enqueueUpdate(ClusterState, List, ZkWriteCallback)} or {@link #writePendingUpdates()}
* throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the * throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the
* class is suspect and the current instance of the class should be discarded and a new instance should be created * class is suspect and the current instance of the class should be discarded and a new instance should be created
* and used for any future updates. * and used for any future updates.
@ -63,14 +64,11 @@ public class ZkStateWriter {
protected final Overseer.Stats stats; protected final Overseer.Stats stats;
protected Map<String, DocCollection> updates = new HashMap<>(); protected Map<String, DocCollection> updates = new HashMap<>();
private int numUpdates = 0;
protected ClusterState clusterState = null; protected ClusterState clusterState = null;
protected boolean isClusterStateModified = false; protected boolean isClusterStateModified = false;
protected long lastUpdatedTime = 0; protected long lastUpdatedTime = 0;
// state information which helps us batch writes
protected int lastStateFormat = -1; // sentinel value
protected String lastCollectionName = null;
/** /**
* Set to true if we ever get a BadVersionException so that we can disallow future operations * Set to true if we ever get a BadVersionException so that we can disallow future operations
* with this instance * with this instance
@ -96,7 +94,7 @@ public class ZkStateWriter {
* be used to force an immediate flush of pending cluster state changes. * be used to force an immediate flush of pending cluster state changes.
* *
* @param prevState the cluster state information on which the given <code>cmd</code> is applied * @param prevState the cluster state information on which the given <code>cmd</code> is applied
* @param cmd the {@link ZkWriteCommand} which specifies the change to be applied to cluster state * @param cmds the list of {@link ZkWriteCommand} which specifies the change to be applied to cluster state in atomic
* @param callback a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used * @param callback a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used
* for any callbacks * for any callbacks
* @return modified cluster state created after applying <code>cmd</code> to <code>prevState</code>. If * @return modified cluster state created after applying <code>cmd</code> to <code>prevState</code>. If
@ -107,48 +105,31 @@ public class ZkStateWriter {
* in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and * in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and
* must be discarded * must be discarded
*/ */
public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws IllegalStateException, Exception { public ClusterState enqueueUpdate(ClusterState prevState, List<ZkWriteCommand> cmds, ZkWriteCallback callback) throws IllegalStateException, Exception {
if (invalidState) { if (invalidState) {
throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used"); throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
} }
if (cmd == NO_OP) return prevState; if (cmds.isEmpty()) return prevState;
if (isNoOps(cmds)) return prevState;
if (maybeFlushBefore(cmd)) {
// we must update the prev state to the new one
prevState = clusterState = writePendingUpdates();
if (callback != null) {
callback.onWrite();
}
}
if (callback != null) { if (callback != null) {
callback.onEnqueue(); callback.onEnqueue();
} }
/* for (ZkWriteCommand cmd : cmds) {
We need to know if the collection has moved from stateFormat=1 to stateFormat=2 (as a result of MIGRATECLUSTERSTATE) if (cmd == NO_OP) continue;
*/ if (!isClusterStateModified && clusterStateGetModifiedWith(cmd, prevState)) {
DocCollection previousCollection = prevState.getCollectionOrNull(cmd.name);
boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
boolean isCurrentlyStateFormat1 = cmd.collection != null && cmd.collection.getStateFormat() == 1;
if (cmd.collection == null) {
if (wasPreviouslyStateFormat1) {
isClusterStateModified = true; isClusterStateModified = true;
} }
clusterState = prevState.copyWith(cmd.name, null); prevState = prevState.copyWith(cmd.name, cmd.collection);
updates.put(cmd.name, null); if (cmd.collection == null || cmd.collection.getStateFormat() != 1) {
} else {
if (!isCurrentlyStateFormat1) {
updates.put(cmd.name, cmd.collection); updates.put(cmd.name, cmd.collection);
numUpdates++;
} }
if (isCurrentlyStateFormat1 || wasPreviouslyStateFormat1) {
isClusterStateModified = true;
}
clusterState = prevState.copyWith(cmd.name, cmd.collection);
} }
clusterState = prevState;
if (maybeFlushAfter(cmd)) { if (maybeFlushAfter()) {
ClusterState state = writePendingUpdates(); ClusterState state = writePendingUpdates();
if (callback != null) { if (callback != null) {
callback.onWrite(); callback.onWrite();
@ -159,35 +140,33 @@ public class ZkStateWriter {
return clusterState; return clusterState;
} }
/** private boolean isNoOps(List<ZkWriteCommand> cmds) {
* Logic to decide a flush before processing a ZkWriteCommand for (ZkWriteCommand cmd : cmds) {
* if (cmd != NO_OP) return false;
* @param cmd the ZkWriteCommand instance
* @return true if a flush is required, false otherwise
*/
protected boolean maybeFlushBefore(ZkWriteCommand cmd) {
if (cmd.collection == null || lastStateFormat <= 0) {
return false;
} }
return cmd.collection.getStateFormat() != lastStateFormat; return true;
} }
/** /**
* Logic to decide a flush after processing a ZkWriteCommand * Check whether {@value ZkStateReader#CLUSTER_STATE} (for stateFormat = 1) get changed given command
*/
private boolean clusterStateGetModifiedWith(ZkWriteCommand command, ClusterState state) {
DocCollection previousCollection = state.getCollectionOrNull(command.name);
boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
boolean isCurrentlyStateFormat1 = command.collection != null && command.collection.getStateFormat() == 1;
return wasPreviouslyStateFormat1 || isCurrentlyStateFormat1;
}
/**
* Logic to decide a flush after processing a list of ZkWriteCommand
* *
* @param cmd the ZkWriteCommand instance
* @return true if a flush to ZK is required, false otherwise * @return true if a flush to ZK is required, false otherwise
*/ */
protected boolean maybeFlushAfter(ZkWriteCommand cmd) { private boolean maybeFlushAfter() {
if (cmd.collection == null) return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL || numUpdates > Overseer.STATE_UPDATE_BATCH_SIZE;
return false;
lastCollectionName = cmd.name;
lastStateFormat = cmd.collection.getStateFormat();
return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL || updates.size() > Overseer.STATE_UPDATE_BATCH_SIZE;
} }
public boolean hasPendingUpdates() { public boolean hasPendingUpdates() {
return !updates.isEmpty() || isClusterStateModified; return numUpdates != 0 || isClusterStateModified;
} }
/** /**
@ -235,6 +214,7 @@ public class ZkStateWriter {
} }
updates.clear(); updates.clear();
numUpdates = 0;
} }
if (isClusterStateModified) { if (isClusterStateModified) {
@ -265,14 +245,6 @@ public class ZkStateWriter {
return clusterState; return clusterState;
} }
/**
* @return time returned by System.nanoTime at which the main cluster state was last written to ZK or 0 if
* never
*/
public long getLastUpdatedTime() {
return lastUpdatedTime;
}
/** /**
* @return the most up-to-date cluster state until the last enqueueUpdate operation * @return the most up-to-date cluster state until the last enqueueUpdate operation
*/ */

View File

@ -37,7 +37,9 @@ import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.overseer.NodeMutator;
import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
@ -389,6 +391,63 @@ public class OverseerTest extends SolrTestCaseJ4 {
} }
} }
@Test
public void testDownNodeFailover() throws Exception {
String zkDir = createTempDir("zkData").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
MockZKController zkController = null;
SolrZkClient zkClient = null;
SolrZkClient overseerClient = null;
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
overseerClient = electNewOverseer(server.getZkAddress());
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
zkController = new MockZKController(server.getZkAddress(), "127.0.0.1");
for (int i = 0; i < 5; i++) {
zkController.createCollection("collection" + i, 1);
assertNotNull("shard got no id?", zkController.publishState("collection"+i, "core1",
"core_node1", "shard1" , Replica.State.ACTIVE, 1));
}
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
ZkStateReader.NODE_NAME_PROP, "127.0.0.1");
List<ZkWriteCommand> commands = new NodeMutator().downNode(reader.getClusterState(), m);
ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
// More than Overseer.STATE_UPDATE_DELAY
Thread.sleep(2200);
q.offer(Utils.toJSON(m));
verifyReplicaStatus(reader, commands.get(0).name, "shard1", "core_node1", Replica.State.DOWN);
overseerClient.close();
Thread.sleep(1000); // wait for overseer to get killed
overseerClient = electNewOverseer(server.getZkAddress());
for (int i = 0; i < 5; i++) {
verifyReplicaStatus(reader, "collection"+i, "shard1", "core_node1", Replica.State.DOWN);
}
} finally {
close(zkClient);
if (zkController != null) {
zkController.close();
}
close(overseerClient);
server.shutdown();
}
}
//wait until collections are available //wait until collections are available
private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException, KeeperException { private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException, KeeperException {
int maxIterations = 100; int maxIterations = 100;

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.solr.cloud.overseer; package org.apache.solr.cloud.overseer;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -88,7 +89,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
// create new collection with stateFormat = 1 // create new collection with stateFormat = 1
DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE); DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1); ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1);
writer.enqueueUpdate(reader.getClusterState(), c1, null); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates(); writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
@ -111,7 +112,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
// Now update the collection to stateFormat = 2 // Now update the collection to stateFormat = 2
DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"); DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2); ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2);
writer.enqueueUpdate(reader.getClusterState(), c2, null); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
writer.writePendingUpdates(); writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
@ -160,7 +161,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
// create new collection with stateFormat = 2 // create new collection with stateFormat = 2
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json")); new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
writer.enqueueUpdate(reader.getClusterState(), c1, null); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates(); writer.writePendingUpdates();
reader.forceUpdateCollection("c1"); reader.forceUpdateCollection("c1");
@ -211,7 +212,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
// create new collection with stateFormat = 2 // create new collection with stateFormat = 2
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json"); DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
ZkWriteCommand wc = new ZkWriteCommand("c1", state); ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), wc, null); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates(); writer.writePendingUpdates();
assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));

View File

@ -17,8 +17,10 @@
package org.apache.solr.cloud.overseer; package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
@ -35,12 +37,37 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ZkStateWriterTest extends SolrTestCaseJ4 { public class ZkStateWriterTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final ZkStateWriter.ZkWriteCallback FAIL_ON_WRITE = new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
}
@Override
public void onWrite() throws Exception {
fail("Got unexpected flush");
}
};
@BeforeClass
public static void setup() {
System.setProperty("solr.OverseerStateUpdateDelay", "1000");
System.setProperty("solr.OverseerStateUpdateBatchSize", "10");
}
@AfterClass
public static void cleanup() {
System.clearProperty("solr.OverseerStateUpdateDelay");
System.clearProperty("solr.OverseerStateUpdateBatchSize");
}
public void testZkStateWriterBatching() throws Exception { public void testZkStateWriterBatching() throws Exception {
String zkDir = createTempDir("testZkStateWriterBatching").toFile().getAbsolutePath(); String zkDir = createTempDir("testZkStateWriterBatching").toFile().getAbsolutePath();
@ -60,64 +87,52 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
try (ZkStateReader reader = new ZkStateReader(zkClient)) { try (ZkStateReader reader = new ZkStateReader(zkClient)) {
reader.createClusterStateWatchersAndUpdate(); reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null)));
assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null)));
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
// create new collection with stateFormat = 2
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1")); new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
ZkWriteCommand c2 = new ZkWriteCommand("c2", ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2")); new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
assertFalse("Different (new) collection create can be batched together with another create", writer.maybeFlushBefore(c2));
// simulate three state changes on same collection, all should be batched together before
assertFalse(writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushBefore(c1));
// and after too
assertFalse(writer.maybeFlushAfter(c1));
assertFalse(writer.maybeFlushAfter(c1));
assertFalse(writer.maybeFlushAfter(c1));
// simulate three state changes on two different collections with stateFormat=2, all should be batched
assertFalse(writer.maybeFlushBefore(c1));
// flushAfter has to be called as it updates the internal batching related info
assertFalse(writer.maybeFlushAfter(c1));
assertFalse(writer.maybeFlushBefore(c2));
assertFalse(writer.maybeFlushAfter(c2));
assertFalse(writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushAfter(c1));
// create a collection in stateFormat = 1 i.e. inside the main cluster state
ZkWriteCommand c3 = new ZkWriteCommand("c3", ZkWriteCommand c3 = new ZkWriteCommand("c3",
new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c3"));
clusterState = writer.enqueueUpdate(clusterState, c3, null); ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
// simulate three state changes in c3, all should be batched // First write is flushed immediately
for (int i = 0; i < 3; i++) { ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
assertFalse(writer.maybeFlushBefore(c3)); clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c1), FAIL_ON_WRITE);
assertFalse(writer.maybeFlushAfter(c3)); clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c2), FAIL_ON_WRITE);
Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100);
AtomicBoolean didWrite = new AtomicBoolean(false);
clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
}
@Override
public void onWrite() throws Exception {
didWrite.set(true);
}
});
assertTrue("Exceed the update delay, should be flushed", didWrite.get());
for (int i = 0; i <= Overseer.STATE_UPDATE_BATCH_SIZE; i++) {
clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), new ZkStateWriter.ZkWriteCallback() {
@Override
public void onEnqueue() throws Exception {
}
@Override
public void onWrite() throws Exception {
didWrite.set(true);
}
});
} }
assertTrue("Exceed the update batch size, should be flushed", didWrite.get());
// simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2)
// none should be batched together
assertFalse(writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1));
assertFalse(writer.maybeFlushAfter(c1));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3));
assertFalse(writer.maybeFlushAfter(c3));
assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2));
assertFalse(writer.maybeFlushAfter(c2));
} }
} finally { } finally {
@ -152,7 +167,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates(); writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
@ -194,7 +209,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json")); new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates(); writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true)); Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
@ -238,7 +253,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
// create collection 1 with stateFormat = 1 // create collection 1 with stateFormat = 1
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
writer.enqueueUpdate(reader.getClusterState(), c1, null); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates(); writer.writePendingUpdates();
reader.forceUpdateCollection("c1"); reader.forceUpdateCollection("c1");
@ -252,22 +267,23 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
zkClient.setData("/clusterstate.json", data, true); zkClient.setData("/clusterstate.json", data, true);
// enqueue another c1 so that ZkStateWriter has pending updates // enqueue another c1 so that ZkStateWriter has pending updates
writer.enqueueUpdate(clusterState, c1, null); writer.enqueueUpdate(clusterState, Collections.singletonList(c1), null);
assertTrue(writer.hasPendingUpdates()); assertTrue(writer.hasPendingUpdates());
// create collection 2 with stateFormat = 1 // Will trigger flush
Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100);
ZkWriteCommand c2 = new ZkWriteCommand("c2", ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2"))); new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
try { try {
writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object writer.enqueueUpdate(clusterState, Collections.singletonList(c2), null); // we are sending in the old cluster state object
fail("Enqueue should not have succeeded"); fail("Enqueue should not have succeeded");
} catch (KeeperException.BadVersionException bve) { } catch (KeeperException.BadVersionException bve) {
// expected // expected
} }
try { try {
writer.enqueueUpdate(reader.getClusterState(), c2, null); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
fail("enqueueUpdate after BadVersionException should not have succeeded"); fail("enqueueUpdate after BadVersionException should not have succeeded");
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// expected // expected
@ -315,7 +331,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
// create collection 2 with stateFormat = 2 // create collection 2 with stateFormat = 2
ZkWriteCommand c2 = new ZkWriteCommand("c2", ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2"))); new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
state = writer.enqueueUpdate(reader.getClusterState(), c2, null); state = writer.enqueueUpdate(state, Collections.singletonList(c2), null);
assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
int sharedClusterStateVersion = state.getZkClusterStateVersion(); int sharedClusterStateVersion = state.getZkClusterStateVersion();
@ -333,27 +349,27 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion()); assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion()); assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
// enqueue an update to stateFormat2 collection such that update is pending writer.enqueueUpdate(state, Collections.singletonList(c2), null);
state = writer.enqueueUpdate(state, c2, null);
assertTrue(writer.hasPendingUpdates()); assertTrue(writer.hasPendingUpdates());
// get the most up-to-date state // get the most up-to-date state
reader.forceUpdateCollection("c2"); reader.forceUpdateCollection("c2");
state = reader.getClusterState(); state = reader.getClusterState();
// enqueue a stateFormat=1 collection which should cause a flush // Will trigger flush
Thread.sleep(Overseer.STATE_UPDATE_DELAY+100);
ZkWriteCommand c1 = new ZkWriteCommand("c1", ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE)); new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
try { try {
writer.enqueueUpdate(state, c1, null); writer.enqueueUpdate(state, Collections.singletonList(c1), null);
fail("Enqueue should not have succeeded"); fail("Enqueue should not have succeeded");
} catch (KeeperException.BadVersionException bve) { } catch (KeeperException.BadVersionException bve) {
// expected // expected
} }
try { try {
writer.enqueueUpdate(reader.getClusterState(), c2, null); writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
fail("enqueueUpdate after BadVersionException should not have succeeded"); fail("enqueueUpdate after BadVersionException should not have succeeded");
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// expected // expected