diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 8c0ef45ea40..9101440c703 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -50,6 +50,8 @@ Bug Fixes
* SOLR-11445: Overseer should not hang when process bad message. (Cao Manh Dat, shalin)
+* SOLR-11447: ZkStateWriter should process commands in atomic. (Cao Manh Dat, shalin)
+
================== 7.1.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3589fd8e9aa..74a236d4077 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -65,8 +65,9 @@ import static org.apache.solr.common.params.CommonParams.ID;
public class Overseer implements Closeable {
public static final String QUEUE_OPERATION = "operation";
- public static final int STATE_UPDATE_DELAY = 2000; // delay between cloud state updates
- public static final int STATE_UPDATE_BATCH_SIZE = 10000;
+ // System properties are used in tests to make them run fast
+ public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates
+ public static final int STATE_UPDATE_BATCH_SIZE = Integer.getInteger("solr.OverseerStateUpdateBatchSize", 10000);
public static final int STATE_UPDATE_MAX_QUEUE = 20000;
public static final int NUM_RESPONSES_TO_STORE = 10000;
@@ -287,9 +288,7 @@ public class Overseer implements Closeable {
timerContext.stop();
}
if (zkWriteCommands != null) {
- for (ZkWriteCommand zkWriteCommand : zkWriteCommands) {
- clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
- }
+ clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommands, callback);
if (!enableBatching) {
clusterState = zkStateWriter.writePendingUpdates();
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 911a9e37f7e..9c810e803e8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -41,11 +42,11 @@ import static java.util.Collections.singletonMap;
* each of which get their own individual state.json in ZK.
*
* Updates to the cluster state are specified using the
- * {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} method. The class buffers updates
+ * {@link #enqueueUpdate(ClusterState, List, ZkWriteCallback)} method. The class buffers updates
* to reduce the number of writes to ZK. The buffered updates are flushed during enqueueUpdate
* automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates.
*
- * If either {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} or {@link #writePendingUpdates()}
+ * If either {@link #enqueueUpdate(ClusterState, List, ZkWriteCallback)} or {@link #writePendingUpdates()}
* throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the
* class is suspect and the current instance of the class should be discarded and a new instance should be created
* and used for any future updates.
@@ -63,14 +64,11 @@ public class ZkStateWriter {
protected final Overseer.Stats stats;
protected Map updates = new HashMap<>();
+ private int numUpdates = 0;
protected ClusterState clusterState = null;
protected boolean isClusterStateModified = false;
protected long lastUpdatedTime = 0;
- // state information which helps us batch writes
- protected int lastStateFormat = -1; // sentinel value
- protected String lastCollectionName = null;
-
/**
* Set to true if we ever get a BadVersionException so that we can disallow future operations
* with this instance
@@ -96,7 +94,7 @@ public class ZkStateWriter {
* be used to force an immediate flush of pending cluster state changes.
*
* @param prevState the cluster state information on which the given cmd
is applied
- * @param cmd the {@link ZkWriteCommand} which specifies the change to be applied to cluster state
+ * @param cmds the list of {@link ZkWriteCommand} which specifies the change to be applied to cluster state in atomic
* @param callback a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used
* for any callbacks
* @return modified cluster state created after applying cmd
to prevState
. If
@@ -107,48 +105,31 @@ public class ZkStateWriter {
* in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and
* must be discarded
*/
- public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws IllegalStateException, Exception {
+ public ClusterState enqueueUpdate(ClusterState prevState, List cmds, ZkWriteCallback callback) throws IllegalStateException, Exception {
if (invalidState) {
throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
}
- if (cmd == NO_OP) return prevState;
-
- if (maybeFlushBefore(cmd)) {
- // we must update the prev state to the new one
- prevState = clusterState = writePendingUpdates();
- if (callback != null) {
- callback.onWrite();
- }
- }
+ if (cmds.isEmpty()) return prevState;
+ if (isNoOps(cmds)) return prevState;
if (callback != null) {
callback.onEnqueue();
}
- /*
- We need to know if the collection has moved from stateFormat=1 to stateFormat=2 (as a result of MIGRATECLUSTERSTATE)
- */
- DocCollection previousCollection = prevState.getCollectionOrNull(cmd.name);
- boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
- boolean isCurrentlyStateFormat1 = cmd.collection != null && cmd.collection.getStateFormat() == 1;
-
- if (cmd.collection == null) {
- if (wasPreviouslyStateFormat1) {
+ for (ZkWriteCommand cmd : cmds) {
+ if (cmd == NO_OP) continue;
+ if (!isClusterStateModified && clusterStateGetModifiedWith(cmd, prevState)) {
isClusterStateModified = true;
}
- clusterState = prevState.copyWith(cmd.name, null);
- updates.put(cmd.name, null);
- } else {
- if (!isCurrentlyStateFormat1) {
+ prevState = prevState.copyWith(cmd.name, cmd.collection);
+ if (cmd.collection == null || cmd.collection.getStateFormat() != 1) {
updates.put(cmd.name, cmd.collection);
+ numUpdates++;
}
- if (isCurrentlyStateFormat1 || wasPreviouslyStateFormat1) {
- isClusterStateModified = true;
- }
- clusterState = prevState.copyWith(cmd.name, cmd.collection);
}
+ clusterState = prevState;
- if (maybeFlushAfter(cmd)) {
+ if (maybeFlushAfter()) {
ClusterState state = writePendingUpdates();
if (callback != null) {
callback.onWrite();
@@ -159,35 +140,33 @@ public class ZkStateWriter {
return clusterState;
}
- /**
- * Logic to decide a flush before processing a ZkWriteCommand
- *
- * @param cmd the ZkWriteCommand instance
- * @return true if a flush is required, false otherwise
- */
- protected boolean maybeFlushBefore(ZkWriteCommand cmd) {
- if (cmd.collection == null || lastStateFormat <= 0) {
- return false;
+ private boolean isNoOps(List cmds) {
+ for (ZkWriteCommand cmd : cmds) {
+ if (cmd != NO_OP) return false;
}
- return cmd.collection.getStateFormat() != lastStateFormat;
+ return true;
}
/**
- * Logic to decide a flush after processing a ZkWriteCommand
+ * Check whether {@value ZkStateReader#CLUSTER_STATE} (for stateFormat = 1) get changed given command
+ */
+ private boolean clusterStateGetModifiedWith(ZkWriteCommand command, ClusterState state) {
+ DocCollection previousCollection = state.getCollectionOrNull(command.name);
+ boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
+ boolean isCurrentlyStateFormat1 = command.collection != null && command.collection.getStateFormat() == 1;
+ return wasPreviouslyStateFormat1 || isCurrentlyStateFormat1;
+ }
+ /**
+ * Logic to decide a flush after processing a list of ZkWriteCommand
*
- * @param cmd the ZkWriteCommand instance
* @return true if a flush to ZK is required, false otherwise
*/
- protected boolean maybeFlushAfter(ZkWriteCommand cmd) {
- if (cmd.collection == null)
- return false;
- lastCollectionName = cmd.name;
- lastStateFormat = cmd.collection.getStateFormat();
- return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL || updates.size() > Overseer.STATE_UPDATE_BATCH_SIZE;
+ private boolean maybeFlushAfter() {
+ return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL || numUpdates > Overseer.STATE_UPDATE_BATCH_SIZE;
}
public boolean hasPendingUpdates() {
- return !updates.isEmpty() || isClusterStateModified;
+ return numUpdates != 0 || isClusterStateModified;
}
/**
@@ -235,6 +214,7 @@ public class ZkStateWriter {
}
updates.clear();
+ numUpdates = 0;
}
if (isClusterStateModified) {
@@ -265,14 +245,6 @@ public class ZkStateWriter {
return clusterState;
}
- /**
- * @return time returned by System.nanoTime at which the main cluster state was last written to ZK or 0 if
- * never
- */
- public long getLastUpdatedTime() {
- return lastUpdatedTime;
- }
-
/**
* @return the most up-to-date cluster state until the last enqueueUpdate operation
*/
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 62f97cbdacd..97e8a17e637 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -37,7 +37,9 @@ import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.overseer.NodeMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
@@ -389,6 +391,63 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
}
+ @Test
+ public void testDownNodeFailover() throws Exception {
+ String zkDir = createTempDir("zkData").toFile().getAbsolutePath();
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ MockZKController zkController = null;
+ SolrZkClient zkClient = null;
+ SolrZkClient overseerClient = null;
+
+ try {
+ server.run();
+ AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+ AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+ zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+ ZkController.createClusterZkNodes(zkClient);
+
+ overseerClient = electNewOverseer(server.getZkAddress());
+
+ ZkStateReader reader = new ZkStateReader(zkClient);
+ reader.createClusterStateWatchersAndUpdate();
+
+ zkController = new MockZKController(server.getZkAddress(), "127.0.0.1");
+
+ for (int i = 0; i < 5; i++) {
+ zkController.createCollection("collection" + i, 1);
+ assertNotNull("shard got no id?", zkController.publishState("collection"+i, "core1",
+ "core_node1", "shard1" , Replica.State.ACTIVE, 1));
+ }
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
+ ZkStateReader.NODE_NAME_PROP, "127.0.0.1");
+ List commands = new NodeMutator().downNode(reader.getClusterState(), m);
+
+ ZkDistributedQueue q = Overseer.getStateUpdateQueue(zkClient);
+ // More than Overseer.STATE_UPDATE_DELAY
+ Thread.sleep(2200);
+ q.offer(Utils.toJSON(m));
+
+ verifyReplicaStatus(reader, commands.get(0).name, "shard1", "core_node1", Replica.State.DOWN);
+ overseerClient.close();
+ Thread.sleep(1000); // wait for overseer to get killed
+
+ overseerClient = electNewOverseer(server.getZkAddress());
+ for (int i = 0; i < 5; i++) {
+ verifyReplicaStatus(reader, "collection"+i, "shard1", "core_node1", Replica.State.DOWN);
+ }
+ } finally {
+ close(zkClient);
+ if (zkController != null) {
+ zkController.close();
+ }
+ close(overseerClient);
+ server.shutdown();
+ }
+ }
+
//wait until collections are available
private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException, KeeperException {
int maxIterations = 100;
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index cb0bac54d01..167279d4512 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cloud.overseer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -88,7 +89,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
// create new collection with stateFormat = 1
DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1);
- writer.enqueueUpdate(reader.getClusterState(), c1, null);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
@@ -111,7 +112,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
// Now update the collection to stateFormat = 2
DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2);
- writer.enqueueUpdate(reader.getClusterState(), c2, null);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
@@ -160,7 +161,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
// create new collection with stateFormat = 2
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
- writer.enqueueUpdate(reader.getClusterState(), c1, null);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
reader.forceUpdateCollection("c1");
@@ -211,7 +212,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
// create new collection with stateFormat = 2
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
- writer.enqueueUpdate(reader.getClusterState(), wc, null);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index f96b5e2adba..436d72e7b5e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -17,8 +17,10 @@
package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
@@ -35,12 +37,37 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkStateWriterTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final ZkStateWriter.ZkWriteCallback FAIL_ON_WRITE = new ZkStateWriter.ZkWriteCallback() {
+ @Override
+ public void onEnqueue() throws Exception {
+
+ }
+
+ @Override
+ public void onWrite() throws Exception {
+ fail("Got unexpected flush");
+ }
+ };
+
+ @BeforeClass
+ public static void setup() {
+ System.setProperty("solr.OverseerStateUpdateDelay", "1000");
+ System.setProperty("solr.OverseerStateUpdateBatchSize", "10");
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ System.clearProperty("solr.OverseerStateUpdateDelay");
+ System.clearProperty("solr.OverseerStateUpdateBatchSize");
+ }
public void testZkStateWriterBatching() throws Exception {
String zkDir = createTempDir("testZkStateWriterBatching").toFile().getAbsolutePath();
@@ -60,64 +87,52 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
try (ZkStateReader reader = new ZkStateReader(zkClient)) {
reader.createClusterStateWatchersAndUpdate();
- ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
-
- assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null)));
- assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null)));
-
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
- // create new collection with stateFormat = 2
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
- assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
-
- ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
-
ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
- assertFalse("Different (new) collection create can be batched together with another create", writer.maybeFlushBefore(c2));
-
- // simulate three state changes on same collection, all should be batched together before
- assertFalse(writer.maybeFlushBefore(c1));
- assertFalse(writer.maybeFlushBefore(c1));
- assertFalse(writer.maybeFlushBefore(c1));
- // and after too
- assertFalse(writer.maybeFlushAfter(c1));
- assertFalse(writer.maybeFlushAfter(c1));
- assertFalse(writer.maybeFlushAfter(c1));
-
- // simulate three state changes on two different collections with stateFormat=2, all should be batched
- assertFalse(writer.maybeFlushBefore(c1));
- // flushAfter has to be called as it updates the internal batching related info
- assertFalse(writer.maybeFlushAfter(c1));
- assertFalse(writer.maybeFlushBefore(c2));
- assertFalse(writer.maybeFlushAfter(c2));
- assertFalse(writer.maybeFlushBefore(c1));
- assertFalse(writer.maybeFlushAfter(c1));
-
- // create a collection in stateFormat = 1 i.e. inside the main cluster state
ZkWriteCommand c3 = new ZkWriteCommand("c3",
- new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
- clusterState = writer.enqueueUpdate(clusterState, c3, null);
+ new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c3"));
+ ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
- // simulate three state changes in c3, all should be batched
- for (int i = 0; i < 3; i++) {
- assertFalse(writer.maybeFlushBefore(c3));
- assertFalse(writer.maybeFlushAfter(c3));
+ // First write is flushed immediately
+ ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
+ clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c1), FAIL_ON_WRITE);
+ clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c2), FAIL_ON_WRITE);
+
+ Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100);
+ AtomicBoolean didWrite = new AtomicBoolean(false);
+ clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), new ZkStateWriter.ZkWriteCallback() {
+ @Override
+ public void onEnqueue() throws Exception {
+
+ }
+
+ @Override
+ public void onWrite() throws Exception {
+ didWrite.set(true);
+ }
+ });
+ assertTrue("Exceed the update delay, should be flushed", didWrite.get());
+
+ for (int i = 0; i <= Overseer.STATE_UPDATE_BATCH_SIZE; i++) {
+ clusterState = writer.enqueueUpdate(clusterState, Collections.singletonList(c3), new ZkStateWriter.ZkWriteCallback() {
+ @Override
+ public void onEnqueue() throws Exception {
+
+ }
+
+ @Override
+ public void onWrite() throws Exception {
+ didWrite.set(true);
+ }
+ });
}
-
- // simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2)
- // none should be batched together
- assertFalse(writer.maybeFlushBefore(c3));
- assertFalse(writer.maybeFlushAfter(c3));
- assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1));
- assertFalse(writer.maybeFlushAfter(c1));
- assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3));
- assertFalse(writer.maybeFlushAfter(c3));
- assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2));
- assertFalse(writer.maybeFlushAfter(c2));
+ assertTrue("Exceed the update batch size, should be flushed", didWrite.get());
}
} finally {
@@ -152,7 +167,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
- ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
@@ -194,7 +209,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
- ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
@@ -238,7 +253,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
// create collection 1 with stateFormat = 1
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
- writer.enqueueUpdate(reader.getClusterState(), c1, null);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
reader.forceUpdateCollection("c1");
@@ -252,22 +267,23 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
zkClient.setData("/clusterstate.json", data, true);
// enqueue another c1 so that ZkStateWriter has pending updates
- writer.enqueueUpdate(clusterState, c1, null);
+ writer.enqueueUpdate(clusterState, Collections.singletonList(c1), null);
assertTrue(writer.hasPendingUpdates());
- // create collection 2 with stateFormat = 1
+ // Will trigger flush
+ Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100);
ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
try {
- writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
+ writer.enqueueUpdate(clusterState, Collections.singletonList(c2), null); // we are sending in the old cluster state object
fail("Enqueue should not have succeeded");
} catch (KeeperException.BadVersionException bve) {
// expected
}
try {
- writer.enqueueUpdate(reader.getClusterState(), c2, null);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
fail("enqueueUpdate after BadVersionException should not have succeeded");
} catch (IllegalStateException e) {
// expected
@@ -315,7 +331,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
// create collection 2 with stateFormat = 2
ZkWriteCommand c2 = new ZkWriteCommand("c2",
new DocCollection("c2", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
- state = writer.enqueueUpdate(reader.getClusterState(), c2, null);
+ state = writer.enqueueUpdate(state, Collections.singletonList(c2), null);
assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
int sharedClusterStateVersion = state.getZkClusterStateVersion();
@@ -333,27 +349,27 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
- // enqueue an update to stateFormat2 collection such that update is pending
- state = writer.enqueueUpdate(state, c2, null);
+ writer.enqueueUpdate(state, Collections.singletonList(c2), null);
assertTrue(writer.hasPendingUpdates());
// get the most up-to-date state
reader.forceUpdateCollection("c2");
state = reader.getClusterState();
- // enqueue a stateFormat=1 collection which should cause a flush
+ // Will trigger flush
+ Thread.sleep(Overseer.STATE_UPDATE_DELAY+100);
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap(), new HashMap(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
try {
- writer.enqueueUpdate(state, c1, null);
+ writer.enqueueUpdate(state, Collections.singletonList(c1), null);
fail("Enqueue should not have succeeded");
} catch (KeeperException.BadVersionException bve) {
// expected
}
try {
- writer.enqueueUpdate(reader.getClusterState(), c2, null);
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
fail("enqueueUpdate after BadVersionException should not have succeeded");
} catch (IllegalStateException e) {
// expected